summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/configure.ac8
-rwxr-xr-xcpp/rubygen/amqpgen.rb79
-rwxr-xr-xcpp/rubygen/cppgen.rb168
-rwxr-xr-xcpp/rubygen/framing.0-10/MethodBodyConstVisitor.rb27
-rwxr-xr-xcpp/rubygen/framing.0-10/MethodBodyDefaultVisitor.rb35
-rwxr-xr-xcpp/rubygen/framing.0-10/MethodHolder.rb100
-rwxr-xr-xcpp/rubygen/framing.0-10/Operations.rb96
-rwxr-xr-xcpp/rubygen/framing.0-10/OperationsInvoker.rb92
-rwxr-xr-xcpp/rubygen/framing.0-10/Proxy.rb84
-rw-r--r--cpp/rubygen/framing.0-10/Session.rb199
-rwxr-xr-xcpp/rubygen/framing.0-10/all_method_bodies.rb21
-rwxr-xr-xcpp/rubygen/framing.0-10/constants.rb99
-rw-r--r--cpp/rubygen/framing.0-10/frame_body_lists.rb31
-rw-r--r--cpp/rubygen/framing.0-10/structs.rb583
-rwxr-xr-xcpp/rubygen/generate10
-rw-r--r--cpp/src/Makefile.am30
-rw-r--r--cpp/src/qpid/Exception.h24
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp22
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp353
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h208
-rw-r--r--cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp3
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h8
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp171
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h67
-rw-r--r--cpp/src/qpid/broker/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp24
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h8
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp71
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.h7
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp208
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h110
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp325
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.h113
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.cpp91
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.h55
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.cpp315
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.h107
-rw-r--r--cpp/src/qpid/broker/PreviewSessionHandler.cpp210
-rw-r--r--cpp/src/qpid/broker/PreviewSessionHandler.h111
-rw-r--r--cpp/src/qpid/broker/PreviewSessionManager.cpp113
-rw-r--r--cpp/src/qpid/broker/PreviewSessionManager.h101
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.cpp174
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.h125
-rw-r--r--cpp/src/qpid/broker/Queue.cpp5
-rw-r--r--cpp/src/qpid/broker/QueueBindings.cpp4
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp9
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp195
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h102
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp13
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp57
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h63
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp87
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h12
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp18
-rw-r--r--cpp/src/qpid/broker/SessionState.h1
-rw-r--r--cpp/src/qpid/client/Channel.cpp5
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp1
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h6
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp11
-rw-r--r--cpp/src/qpid/client/Session.h4
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp26
-rw-r--r--cpp/src/qpid/client/SessionImpl.h25
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp8
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h7
-rw-r--r--cpp/src/qpid/framing/Array.cpp6
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp2
-rw-r--r--cpp/src/qpid/framing/BodyHolder.cpp2
-rw-r--r--cpp/src/qpid/framing/Buffer.cpp1
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp2
-rw-r--r--cpp/src/qpid/framing/FieldValue.cpp2
-rw-r--r--cpp/src/qpid/framing/FramingContent.cpp73
-rw-r--r--cpp/src/qpid/framing/FramingContent.h63
-rw-r--r--cpp/src/qpid/framing/ModelMethod.h8
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.cpp16
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h6
-rw-r--r--cpp/src/qpid/framing/SequenceSet.cpp2
-rw-r--r--cpp/src/qpid/framing/Uuid.cpp2
-rw-r--r--cpp/src/qpid/framing/amqp_types_full.h2
-rw-r--r--cpp/src/tests/ExchangeTest.cpp3
-rw-r--r--cpp/src/tests/exception_test.cpp2
-rw-r--r--cpp/src/tests/interop_runner.cpp2
-rwxr-xr-xcpp/src/tests/python_tests1
89 files changed, 1793 insertions, 3884 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac
index c062fc3f8b..bc30e14dca 100644
--- a/cpp/configure.ac
+++ b/cpp/configure.ac
@@ -122,9 +122,7 @@ test -n "$RUBY" && generate=yes
test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install ruby").])
specdir=`pwd`/$srcdir/../specs
-AMQP_PREVIEW_XML=$specdir/amqp.0-10-preview.xml
-AMQP_FINAL_XML=$specdir/amqp.0-10.xml
-AC_SUBST(AMQP_PREVIEW_XML)
+AMQP_FINAL_XML=$specdir/amqp.0-10-qpid-errata.xml
AC_SUBST(AMQP_FINAL_XML)
AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null])
@@ -154,8 +152,8 @@ AC_ARG_WITH([cpg],
;;
*) AC_MSG_ERROR([Bad value ${withval} for --with-cpg option]) ;;
esac],
- [ # not specified - enable if libs/headers available.
- with_CPG=yes
+ [ # not specified - GS 24-APR-2008 temporarily disabled - (enable if libs/headers available).
+ with_CPG=no
AC_CHECK_HEADERS([openais/cpg.h],,[with_CPG=no])
AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no])
]
diff --git a/cpp/rubygen/amqpgen.rb b/cpp/rubygen/amqpgen.rb
index 76685aa45b..bfa15bb391 100755
--- a/cpp/rubygen/amqpgen.rb
+++ b/cpp/rubygen/amqpgen.rb
@@ -26,7 +26,7 @@ class String
def bars() tr('- .','_'); end
# Convert to ALL_UPPERCASE_FORM
- def shout() bars.upcase!; end
+ def shout() bars.upcase; end
# Convert to lowerCaseCapitalizedForm
def lcaps() gsub( /\W(\w)/ ) { |m| $1.upcase } end
@@ -89,7 +89,6 @@ class Module
end
end
-
# An AmqpElement contains an XML element and provides a convenient
# API to access AMQP data.
#
@@ -109,6 +108,7 @@ class AmqpElement
@xml, @parent=xml, parent
@children=xml.elements.map { |e| wrap e }.compact
@cache_child={}
+ @cache_child_named={}
@cache_children={}
@cache_children[nil]=@children
end
@@ -142,14 +142,15 @@ class AmqpElement
@cache_child[[elname,name]] ||= children(elname).find { |c| c.name==name }
end
+ # Look up any child with name
+ def child_named(name)
+ @cache_child_named[name] ||= @children.find { |c| c.name==name }
+ end
+
# The root <amqp> element.
def root() @root ||=parent ? parent.root : self; end
- # Are we in preview or final 0-10
- # preview - used to make some classes behave differently for preview vs. final
- def final?() root().version == "0-10"; end
-
- def to_s() "#<#{self.class}(#{name})>"; end
+ def to_s() "#<#{self.class}(#{fqname})>"; end
def inspect() to_s; end
# Text of doc child if there is one.
@@ -207,14 +208,6 @@ class AmqpDomain < AmqpElement
amqp_single_child_reader :enum
def uses() type_=="array" ? ArrayTypes[name] : type_; end
-
- def unalias()
- d=self
- while (d.type_ != d.name and root.domain(d.type_))
- d=root.domain(d.type_)
- end
- return d
- end
end
class AmqpException < AmqpElement
@@ -227,8 +220,6 @@ class AmqpField < AmqpElement
super;
root.used_by[type_].push(parent.fqname) if type_ and type_.index('.')
end
-
- def domain() root.domain(xml.attributes["domain"]); end
amqp_single_child_reader :struct # preview
amqp_child_reader :exception
amqp_attr_reader :type, :default, :code, :required
@@ -278,9 +269,6 @@ class AmqpStruct < AmqpElement
amqp_attr_reader :size, :code, :pack
amqp_child_reader :field
- # preview - preview code needs default "short" for pack.
- alias :raw_pack :pack
- def pack() raw_pack or (not parent.final? and "short"); end
def result?() parent.xml.name == "result"; end
def domain?() parent.xml.name == "domain"; end
end
@@ -297,6 +285,24 @@ class AmqpMethod < AmqpElement
def on_server?() on_chassis? "server"; end
end
+# preview: Map command/control to preview method.
+class AmqpFakeMethod < AmqpMethod
+ def initialize(action)
+ super(action.xml, action.parent);
+ @action=action
+ end
+
+ def content() return "1" if @action.is_a? AmqpCommand and @action.segments end
+ def index() @action.code end
+ def code() @action.code end
+ def synchronous() end # FIXME aconway 2008-04-10: ???
+ def on_chassis?(chassis)
+ @action.received_by?(chassis)
+ end
+ def pack() "2" end # Encode pack=2, size=4 struct
+ def size() "4" end
+end
+
class AmqpImplement < AmqpElement
def initialize(xml,amqp) super; end
amqp_attr_reader :handle, :send
@@ -312,6 +318,11 @@ class AmqpAction < AmqpElement
def initialize(xml,amqp) super; end
amqp_child_reader :implement, :field, :response
amqp_attr_reader :code
+ def implement?(role) xml.elements["./implement[@role='#{role}']"] end
+ def received_by?(client_or_server)
+ return (implement?(client_or_server) or implement?("sender") or implement?("receiver"))
+ end
+ def pack() "2" end
def size() "4" end # Encoded as a size 4 Struct
end
@@ -329,26 +340,37 @@ class AmqpClass < AmqpElement
def initialize(xml,amqp) super; end
amqp_attr_reader :index # preview
- amqp_child_reader :method # preview
- amqp_child_reader :struct, :domain, :control, :command, :role
+ amqp_child_reader :struct, :domain, :control, :command, :role, :method
amqp_attr_reader :code
+ def actions() controls+commands; end
+
+ # preview - command/control as methods
+ def methods_()
+ return (controls + commands).map { |a| AmqpFakeMethod.new(a) }
+ end
+
+ def method(name)
+ a = (command(name) or control(name))
+ return AmqpFakeMethod.new(a)
+ end
+
# chassis should be "client" or "server"
def methods_on(chassis) # preview
@methods_on ||= { }
@methods_on[chassis] ||= methods_.select { |m| m.on_chassis? chassis }
end
+ # FIXME aconway 2008-04-11:
def l4?() # preview
!["connection", "session", "execution"].include?(name) && !control?
end
+ # FIXME aconway 2008-04-11:
def control?()
- ["connection010", "session010"].include?(name)
+ ["connection", "session"].include?(name)
end
-
- def actions() controls+commands; end
end
class AmqpType < AmqpElement
@@ -388,13 +410,6 @@ class AmqpRoot < AmqpElement
def version() major + "-" + minor; end
- # preview - only struct child reader remains for new mapping
- def domain_structs() domains.map{ |d| d.struct }.compact; end
- def result_structs()
- methods_.map { |m| m.result and m.result.struct }.compact
- end
- def structs() result_structs+domain_structs; end
-
def methods_() classes.map { |c| c.methods_ }.flatten; end
#preview
diff --git a/cpp/rubygen/cppgen.rb b/cpp/rubygen/cppgen.rb
index c1121e9bfe..c09ed66b29 100755
--- a/cpp/rubygen/cppgen.rb
+++ b/cpp/rubygen/cppgen.rb
@@ -71,9 +71,9 @@ class String
def varname() lcaps.cppsafe; end
end
-# Hold information about a C++ type.
+# preview: Hold information about a C++ type.
#
-# preview - new mapping does not use CppType,
+# new mapping does not use CppType,
# Each amqp type corresponds exactly by dotted name
# to a type, domain or struct, which in turns
# corresponds by name to a C++ type or typedef.
@@ -88,12 +88,8 @@ class CppType
def passcref() @param="const #{name}&"; self; end
def code(str) @code=str; self; end
def defval(str) @defval=str; self; end
- def fq(namespace)
- @param="const #{namespace}::#{name}&"
- @ret="const #{namespace}::#{name}&"
- @defval="#{namespace}::#{name}()"
- self
- end
+ def encoded() @code end
+ def ret_by_val() @name; end
def encode(value, buffer)
@code ? "#{buffer}.put#{@code}(#{value});" : "#{value}.encode(#{buffer});"
@@ -118,19 +114,86 @@ class CppType
def to_s() name; end;
end
+class AmqpRoot
+ # preview; map 0-10 types to preview code generator types
+ @@typemap = {
+ "bit"=> CppType.new("bool").code("Octet").defval("false"),
+ "uint8"=>CppType.new("uint8_t").code("Octet").defval("0"),
+ "uint16"=>CppType.new("uint16_t").code("Short").defval("0"),
+ "uint32"=>CppType.new("uint32_t").code("Long").defval("0"),
+ "uint64"=>CppType.new("uint64_t").code("LongLong").defval("0"),
+ "datetime"=>CppType.new("uint64_t").code("LongLong").defval("0"),
+ "str8"=>CppType.new("string").passcref.retcref.code("ShortString"),
+ "str16"=>CppType.new("string").passcref.retcref.code("MediumString"),
+ "str32"=>CppType.new("string").passcref.retcref.code("LongString"),
+ "vbin8"=>CppType.new("string").passcref.retcref.code("ShortString"),
+ "vbin16"=>CppType.new("string").passcref.retcref.code("MediumString"),
+ "vbin32"=>CppType.new("string").passcref.retcref.code("LongString"),
+ "map"=>CppType.new("FieldTable").passcref.retcref,
+ "array"=>CppType.new("Array").passcref.retcref,
+ "sequence-no"=>CppType.new("SequenceNumber").passcref,
+ "sequence-set"=>CppType.new("SequenceSet").passcref.retcref,
+ "struct32"=>CppType.new("string").passcref.retcref.code("LongString"),
+ "uuid"=>CppType.new("Uuid").passcref.retcref,
+ "byte-ranges"=>CppType.new("ByteRanges").passcref.retcref
+ }
+
+ # preview: map amqp types to preview cpp types.
+ def lookup_cpptype(t) t = @@typemap[t] and return t end
+end
+
+
class AmqpElement
# convert my amqp type_ attribute to a C++ type.
def amqp2cpp()
return "ArrayDomain<#{ArrayTypes[name].amqp2cpp}> " if type_=="array"
return type_.amqp2cpp
end
+
+ # Does this object have a type-like child named name?
+ def typechild(name)
+ child = domain(name) if respond_to? :domain
+ child = struct(name) if not child and respond_to? :struct
+ child = type_(name) if not child and respond_to? :type_
+ child
+ end
+
+ # dotted name to a type object
+ def dotted_typechild(name)
+ names=name.split('.')
+ context = self
+ while context and names.size > 1
+ context = context.child_named(names.shift)
+ end
+ return context.typechild(names[0]) if context
+ end
+
+ # preview mapping - type_ attribute to C++ type
+ def lookup_cpptype(name)
+ if t = root.lookup_cpptype(name) then return t
+ elsif c = containing_class.typechild(name) then return c.cpptype
+ elsif c= root.dotted_typechild(name) then return c.cpptype
+ else raise "Cannot resolve type-name #{name} from #{self}"
+ end
+ end
+
+ def containing_class()
+ return self if is_a? AmqpClass
+ return parent && parent.containing_class
+ end
end
+
class AmqpField
+ def struct?()
+ c=containing_class
+ c.struct(type_)
+ end
+ def cpptype() lookup_cpptype(type_) or raise "no cpptype #{self}" end
def cppname() name.lcaps.cppsafe; end
- def cpptype() domain.cpptype; end
- def bit?() domain.type_ == "bit"; end
+ def bit?() type_ == "bit"; end
def signature() cpptype.param+" "+cppname; end
+
def fqtypename()
unless type_.index(".")
c=containing_class
@@ -149,26 +212,11 @@ class AmqpMethod
def cppname() name.lcaps.cppsafe; end
def param_names() fields.map { |f| f.cppname }; end
def signature() fields.map { |f| f.signature }; end
- def classname()
- #TODO: remove name mangling after preview path is dropped
- if (parent.name.include?("010"))
- return parent.name.delete("010")
- elsif (parent.name == "cluster")
- return parent.name
- else
- return parent.name + "X"
- end
- end
+ def classname() parent.name; end
def body_name()
classname().caps+name.caps+"Body"
end
-
- def cpp_pack_type() # preview
- CppType.new("uint16_t").code("Short").defval("0");
- end
- def pack() # preview
- "short"
- end
+ def cpp_pack_type() root.lookup_cpptype("uint16") end
end
module AmqpHasFields
@@ -192,7 +240,8 @@ class AmqpAction
end
class AmqpType
- def typename() name.typename; end
+ def cpptype() root.lookup_cpptype(name) end # preview
+ def typename() name.typename; end # new mapping
def fixed?() fixed_width; end
end
@@ -210,67 +259,42 @@ class AmqpClass
end
class AmqpDomain
- @@typemap = {
- "bit"=> CppType.new("bool").code("Octet").defval("false"),
- "octet"=>CppType.new("uint8_t").code("Octet").defval("0"),
- "short"=>CppType.new("uint16_t").code("Short").defval("0"),
- "long"=>CppType.new("uint32_t").code("Long").defval("0"),
- "longlong"=>CppType.new("uint64_t").code("LongLong").defval("0"),
- "timestamp"=>CppType.new("uint64_t").code("LongLong").defval("0"),
- "longstr"=>CppType.new("string").passcref.retcref.code("LongString"),
- "shortstr"=>CppType.new("string").passcref.retcref.code("ShortString"),
- "mediumstr"=>CppType.new("string").passcref.retcref.code("MediumString"),
- "table"=>CppType.new("FieldTable").passcref.retcref,
- "array"=>CppType.new("Array").passcref.retcref,
- "content"=>CppType.new("Content").passcref.retcref,
- "rfc1982-long-set"=>CppType.new("SequenceNumberSet").passcref.retcref,
- "sequence-set"=>CppType.new("SequenceSet").passcref.retcref,
- "long-struct"=>CppType.new("string").passcref.retcref.code("LongString"),
- "uuid"=>CppType.new("Uuid").passcref.retcref
- }
-
- def cppname()
- #TODO: remove name mangling after preview path is dropped
- if (name.include?("010"))
- return name.caps.delete("010")
- elsif (name.include?("properties"))
- return "Preview" + name.caps
- else
- return name.caps
- end
- end
+ # preview
+ def cpptype() lookup_cpptype(type_) end
+ def cppname() name.caps; end
+ # new mapping
def fqtypename()
return containing_class.nsname+"::"+name.typename if containing_class
name.typename
end
-
- def cpptype()
- d=unalias
- @cpptype ||= @@typemap[d.type_] or
- CppType.new(d.cppname).fq("qpid::framing") or
- raise "Invalid type #{self}"
- end
-
- def AmqpDomain.lookup_type(t)
- @@typemap[t]
- end
end
class AmqpResult
+ # preview
def cpptype()
- @cpptype=CppType.new(parent.classname.caps+parent.name.caps+"Result").passcref
+ if type_ then lookup_cpptype(type_)
+ else CppType.new(parent.parent.name.caps+parent.name.caps+"Result").passcref
+ end
end
end
class AmqpStruct
include AmqpHasFields
+ @@pack_types={ "1"=>"uint8", "2"=>"uint16", "4"=>"uint32"}
def cpp_pack_type() # preview
- AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t");
+ root.lookup_cpptype(@@pack_types[pack])
end
- def cpptype() parent.cpptype; end # preview
- def cppname() cpptype.name; end # preview
+ def cpptype() CppType.new(cppname).passcref.retcref end
+ #def cppname() containing_class.cppname+name.caps; end
+ def cppname()
+ if parent.kind_of? AmqpResult
+ parent.parent.parent.name.caps+parent.parent.name.caps+"Result"
+ else
+ name.caps
+ end
+ end
def fqclassname() containing_class.nsname+"::"+name.typename; end
def classname() name.typename; end
def full_code() (containing_class.code.hex << 8)+code.hex; end
diff --git a/cpp/rubygen/framing.0-10/MethodBodyConstVisitor.rb b/cpp/rubygen/framing.0-10/MethodBodyConstVisitor.rb
new file mode 100755
index 0000000000..f9ef95f5a0
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/MethodBodyConstVisitor.rb
@@ -0,0 +1,27 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class MethodBodyConstVisitorGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @classname="MethodBodyConstVisitor"
+ @filename="qpid/framing/MethodBodyConstVisitor"
+ end
+
+ def generate()
+ h_file("#{@filename}") {
+ namespace(@namespace) {
+ @amqp.methods_.each { |m| genl "class #{m.body_name};" }
+ cpp_class("MethodBodyConstVisitor") {
+ genl "public:"
+ genl "virtual ~MethodBodyConstVisitor() {}"
+ @amqp.methods_.each { |m| genl "virtual void visit(const #{m.body_name}&) = 0;" }
+ }}}
+ end
+end
+
+MethodBodyConstVisitorGen.new($outdir, $amqp).generate();
+
diff --git a/cpp/rubygen/framing.0-10/MethodBodyDefaultVisitor.rb b/cpp/rubygen/framing.0-10/MethodBodyDefaultVisitor.rb
new file mode 100755
index 0000000000..a74b0c06d6
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/MethodBodyDefaultVisitor.rb
@@ -0,0 +1,35 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class MethodBodyDefaultVisitorGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace, @classname, @filename = parse_classname("qpid::framing::MethodBodyDefaultVisitor")
+ end
+
+ def generate()
+ h_file(@filename) {
+ include "qpid/framing/MethodBodyConstVisitor"
+ namespace(@namespace) {
+ genl "class AMQMethodBody;"
+ cpp_class(@classname, "public MethodBodyConstVisitor") {
+ genl "public:"
+ genl "virtual void defaultVisit(const AMQMethodBody&) = 0;"
+ @amqp.methods_.each { |m|
+ genl "virtual void visit(const #{m.body_name}&);" }
+ }}}
+
+ cpp_file(@filename) {
+ include(@filename)
+ include("all_method_bodies.h")
+ namespace(@namespace) {
+ @amqp.methods_.each { |m|
+ genl "void #{@classname}::visit(const #{m.body_name}& b) { defaultVisit(b); }"
+ }}}
+ end
+end
+
+MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate();
+
diff --git a/cpp/rubygen/framing.0-10/MethodHolder.rb b/cpp/rubygen/framing.0-10/MethodHolder.rb
new file mode 100755
index 0000000000..90a9333916
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/MethodHolder.rb
@@ -0,0 +1,100 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class MethodHolderGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @classname="BodyHolder"
+ @filename="qpid/framing/BodyHolder"
+ end
+
+ def gen_max_size()
+ # Generate program to generate MaxSize.h
+ cpp_file("generate_MaxMethodBodySize_h") {
+ include "qpid/framing/AMQHeaderBody"
+ include "qpid/framing/AMQContentBody"
+ include "qpid/framing/AMQHeartbeatBody"
+ @amqp.methods_.each { |m| include "qpid/framing/#{m.body_name}" }
+ genl
+ include "<algorithm>"
+ include "<fstream>"
+ genl
+ genl "using namespace std;"
+ genl "using namespace qpid::framing;"
+ genl
+ scope("int main(int, char** argv) {") {
+ genl "size_t maxSize=0;"
+ genl "maxSize=max(maxSize, sizeof(AMQHeaderBody));"
+ genl "maxSize=max(maxSize, sizeof(AMQContentBody));"
+ genl "maxSize=max(maxSize, sizeof(AMQHeartbeatBody));"
+ @amqp.methods_.each { |m|
+ genl "maxSize=max(maxSize, sizeof(#{m.body_name}));" }
+ gen <<EOS
+ofstream out("qpid/framing/MaxMethodBodySize.h");
+out << "// GENERATED CODE: generated by " << argv[0] << endl;
+out << "namespace qpid{ namespace framing { " << endl;
+out << "const size_t MAX_METHOD_BODY_SIZE=" << maxSize << ";" << endl;
+out << "}}" << endl;
+EOS
+ }
+ }
+ end
+
+ def gen_construct
+ cpp_file(@filename+"_gen") {
+ include @filename
+ include "qpid/framing/AMQHeaderBody"
+ include "qpid/framing/AMQContentBody"
+ include "qpid/framing/AMQHeartbeatBody"
+ @amqp.methods_.each { |m| include "qpid/framing/#{m.body_name}" }
+ include "qpid/framing/FrameDefaultVisitor.h"
+ include "qpid/Exception.h"
+ genl
+ namespace(@namespace) {
+ scope("void #{@classname}::setMethod(ClassId c, MethodId m) {") {
+ scope("switch (c) {") {
+ @amqp.classes.each { |c|
+ scope("case #{c.code}: switch(m) {") {
+ c.methods_.each { |m|
+ genl "case #{m.code}: blob = in_place<#{m.body_name}>(); break;"
+ }
+ genl "default: throw Exception(QPID_MSG(\"Invalid method id \" << int(m) << \" for class #{c.name} \"));"
+ }
+ genl "break;"
+ }
+ genl "default: throw Exception(QPID_MSG(\"Invalid class id \" << int(c)));"
+ }
+ }
+
+ struct("CopyVisitor", "public FrameDefaultVisitor") {
+ genl "using FrameDefaultVisitor::visit;"
+ genl "using FrameDefaultVisitor::defaultVisit;"
+ genl "BodyHolder& holder;"
+ genl "CopyVisitor(BodyHolder& h) : holder(h) {}"
+ ["Header", "Content", "Heartbeat"].each { |type|
+ genl "void visit(const AMQ#{type}Body& x) { holder=x; }"
+ }
+ @amqp.methods_.each { |m|
+ genl "void visit(const #{m.body_name}& x) { holder=x; }"
+ }
+ genl "void defaultVisit(const AMQBody&) { assert(0); }"
+ }
+ genl
+
+ scope("void BodyHolder::setBody(const AMQBody& b) {") {
+ genl "CopyVisitor cv(*this); b.accept(cv);"
+ }
+ }}
+ end
+
+ def generate
+ gen_max_size
+ gen_construct
+ end
+end
+
+MethodHolderGen.new($outdir, $amqp).generate();
+
diff --git a/cpp/rubygen/framing.0-10/Operations.rb b/cpp/rubygen/framing.0-10/Operations.rb
new file mode 100755
index 0000000000..a22a591f14
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/Operations.rb
@@ -0,0 +1,96 @@
+#!/usr/bin/env ruby
+# Usage: output_directory xml_spec_file [xml_spec_file...]
+#
+$: << '..'
+require 'cppgen'
+require 'fileutils'
+require 'etc'
+require 'pathname'
+
+class OperationsGen < CppGen
+
+ def initialize(chassis, outdir, amqp)
+ super(outdir, amqp)
+ @chassis=chassis
+ @classname="AMQP_#{@chassis.caps}Operations"
+ end
+
+ def handler_method (m)
+ return_type = m.result ? m.result.cpptype.ret_by_val : "void"
+ gen "\nvirtual #{return_type} #{m.cppname}("
+ gen m.signature.join(",\n")
+ gen ") = 0;\n"
+ end
+
+ def handler_classname(c) c.name.caps+"Handler"; end
+
+ def handler_class(c)
+ if (!c.methods_on(@chassis).empty?)
+ handlerclass=handler_classname c
+ gen <<EOS
+// ==================== class #{handlerclass} ====================
+class #{handlerclass} {
+ // Constructors and destructors
+ public:
+ class Invoker; // Declared in #{@chassis.caps}Invoker
+
+ #{handlerclass}(){};
+ virtual ~#{handlerclass}() {}
+ // Protocol methods
+EOS
+ c.methods_on(@chassis).each { |m| handler_method(m) if !m.content() }
+ gen <<EOS
+}; // class #{handlerclass}
+
+
+EOS
+ end
+ end
+
+ def handler_get(c)
+ if (!c.methods_on(@chassis).empty?)
+ handlerclass=handler_classname c
+ gen "virtual #{handlerclass}* get#{handlerclass}() = 0;\n"
+ end
+ end
+
+ def generate()
+ h_file("qpid/framing/#{@classname}.h") {
+ gen <<EOS
+#include <sstream>
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/amqp_structs.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQMethodBody;
+
+class #{@classname} {
+ public:
+ class Invoker; // Declared in #{@chassis.caps}Invoker
+
+ virtual ~#{@classname}() {}
+
+ virtual ProtocolVersion getVersion() const = 0;
+
+ // Inner classes
+EOS
+ indent { @amqp.classes.each { |c| handler_class(c) } }
+ gen <<EOS
+
+ // Method handler get methods
+
+EOS
+ indent { @amqp.classes.each { |c| handler_get(c) } }
+ gen <<EOS
+}; /* class #{@classname} */
+}}
+EOS
+}
+ end
+end
+
+OperationsGen.new("client",ARGV[0], $amqp).generate()
+OperationsGen.new("server",ARGV[0], $amqp).generate()
+
diff --git a/cpp/rubygen/framing.0-10/OperationsInvoker.rb b/cpp/rubygen/framing.0-10/OperationsInvoker.rb
new file mode 100755
index 0000000000..642f98ce8e
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/OperationsInvoker.rb
@@ -0,0 +1,92 @@
+#!/usr/bin/env ruby
+# Usage: output_directory xml_spec_file [xml_spec_file...]
+#
+$: << '..'
+require 'cppgen'
+
+class OperationsInvokerGen < CppGen
+ def initialize(chassis, outdir, amqp)
+ super(outdir, amqp)
+ @chassis=chassis
+ @ops="AMQP_#{@chassis.caps}Operations"
+ @classname="#{@ops}::Invoker"
+ @filename="qpid/framing/#{@chassis.caps}Invoker"
+ end
+
+ def handler(c) "#{@ops}::#{c.cppname}Handler"; end
+ def getter(c) "get#{c.cppname}Handler"; end
+ def invoker(c) "#{handler(c)}::Invoker"; end
+ def visit_methods(c) c.methods_on(@chassis).select { |m| !m.content } end
+
+ def handler_visits_cpp(c)
+ visit_methods(c).each { |m|
+ scope("void #{invoker(c)}::visit(const #{m.body_name}& body) {") {
+ if (m.result)
+ genl "this->encode(body.invoke(target), result.result);"
+ else
+ genl "body.invoke(target);"
+ end
+ genl "result.handled=true;"
+ }
+ }
+ end
+
+ def ops_visits_cpp()
+ @amqp.classes.each { |c|
+ visit_methods(c).each { |m|
+ scope("void #{@classname}::visit(const #{m.body_name}& body) {") {
+ genl "#{handler(c)}::Invoker invoker(*target.#{getter(c)}());"
+ genl "body.accept(invoker);"
+ genl "result=invoker.getResult();"
+ }
+ }
+ }
+ end
+
+ def invoker_h(invoker, target, methods)
+ return if methods.empty?
+ genl
+ cpp_class(invoker, "public qpid::framing::Invoker") {
+ genl "#{target}& target;"
+ public
+ genl("Invoker(#{target}& target_) : target(target_) {}")
+ genl "using MethodBodyDefaultVisitor::visit;"
+ methods.each { |m| genl "void visit(const #{m.body_name}& body);" }
+ }
+ end
+
+ def generate()
+ h_file(@filename) {
+ include "qpid/framing/#{@ops}"
+ include "qpid/framing/Invoker.h"
+ namespace("qpid::framing") {
+ # AMQP_*Operations invoker.
+ methods=@amqp.classes.map { |c| visit_methods(c).to_a }.flatten
+ invoker_h(@classname, @ops, methods)
+
+ # AMQP_*Operations::*Handler invokers.
+ @amqp.classes.each { |c|
+ invoker_h(invoker(c), handler(c), visit_methods(c))
+ }
+ }
+ }
+
+ cpp_file(@filename) {
+ include @filename
+ @amqp.classes.each { |c|
+ visit_methods(c).each { |m|
+ include "qpid/framing/#{m.body_name}"
+ }}
+ namespace("qpid::framing") {
+ ops_visits_cpp
+ @amqp.classes.each { |c|
+ next if visit_methods(c).empty?
+ handler_visits_cpp(c)
+ }
+ }
+ }
+ end
+end
+
+OperationsInvokerGen.new("client",ARGV[0], $amqp).generate()
+OperationsInvokerGen.new("server",ARGV[0], $amqp).generate()
diff --git a/cpp/rubygen/framing.0-10/Proxy.rb b/cpp/rubygen/framing.0-10/Proxy.rb
new file mode 100755
index 0000000000..87d809d4ad
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/Proxy.rb
@@ -0,0 +1,84 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class ProxyGen < CppGen
+
+ def initialize(chassis, outdir, amqp)
+ super(outdir, amqp)
+ @chassis=chassis
+ @classname="AMQP_#{@chassis.caps}Proxy"
+ @filename="qpid/framing/#{@classname}"
+ end
+
+ def proxy_member(c) c.name.lcaps+"Proxy"; end
+
+ def inner_class_decl(c)
+ cname=c.name.caps
+ cpp_class(cname, "Proxy") {
+ gen <<EOS
+public:
+#{cname}(FrameHandler& f) : Proxy(f) {}
+static #{cname}& get(#{@classname}& proxy) { return proxy.get#{cname}(); }
+EOS
+ c.methods_on(@chassis).each { |m|
+ genl "virtual void #{m.cppname}(#{m.signature.join(",\n ")});"
+ genl
+ }}
+ end
+
+ def inner_class_defn(c)
+ cname=c.cppname
+ c.methods_on(@chassis).each { |m|
+ genl "void #{@classname}::#{cname}::#{m.cppname}(#{m.signature.join(", ")})"
+ scope {
+ params=(["getVersion()"]+m.param_names).join(", ")
+ genl "send(#{m.body_name}(#{params}));"
+ }}
+ end
+
+ def generate
+ # .h file
+ h_file(@filename) {
+ include "qpid/framing/Proxy.h"
+ include "qpid/framing/Array.h"
+ include "qpid/framing/amqp_types.h"
+ include "qpid/framing/amqp_structs.h"
+ namespace("qpid::framing") {
+ cpp_class(@classname, "public Proxy") {
+ public
+ genl "#{@classname}(FrameHandler& out);"
+ genl
+ @amqp.classes.each { |c|
+ inner_class_decl(c)
+ genl
+ genl "#{c.cppname}& get#{c.cppname}() { return #{proxy_member(c)}; }"
+ genl
+ }
+ private
+ @amqp.classes.each{ |c| gen c.cppname+" "+proxy_member(c)+";\n" }
+ }}}
+
+ # .cpp file
+ cpp_file(@filename) {
+ include "<sstream>"
+ include "#{@classname}.h"
+ include "qpid/framing/amqp_types_full.h"
+ @amqp.methods_on(@chassis).each {
+ |m| include "qpid/framing/"+m.body_name
+ }
+ genl
+ namespace("qpid::framing") {
+ genl "#{@classname}::#{@classname}(FrameHandler& f) :"
+ gen " Proxy(f)"
+ @amqp.classes.each { |c| gen ",\n "+proxy_member(c)+"(f)" }
+ genl "{}\n"
+ @amqp.classes.each { |c| inner_class_defn(c) }
+ }}
+ end
+end
+
+
+ProxyGen.new("client", $outdir, $amqp).generate;
+ProxyGen.new("server", $outdir, $amqp).generate;
+
diff --git a/cpp/rubygen/framing.0-10/Session.rb b/cpp/rubygen/framing.0-10/Session.rb
new file mode 100644
index 0000000000..a2f06bfcd7
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/Session.rb
@@ -0,0 +1,199 @@
+#!/usr/bin/env ruby
+# Usage: output_directory xml_spec_file [xml_spec_file...]
+#
+$: << '..'
+require 'cppgen'
+
+class CppGen
+ def session_methods
+ excludes = ["connection", "session", "file", "stream"]
+ gen_methods=@amqp.methods_on(@chassis).reject { |m|
+ excludes.include? m.parent.name or m.body_name.include?("010")
+ }
+ end
+
+ def doxygen(m)
+ doxygen_comment {
+ genl m.doc
+ genl
+ m.fields_c.each { |f|
+ genl "@param #{f.cppname}"
+ genl f.doc if f.doc
+ genl
+ }
+ }
+ end
+end
+
+class ContentField # For extra content parameters
+ def cppname() "content" end
+ def signature() "const MethodContent& content" end
+ def sig_default() signature+"="+"DefaultContent(std::string())" end
+ def unpack() "p[arg::content|DefaultContent(std::string())]"; end
+ def doc() "Message content"; end
+end
+
+class AmqpField
+ def unpack() "p[arg::#{cppname}|#{cpptype.default_value}]"; end
+ def sig_default() signature+"="+cpptype.default_value; end
+end
+
+class AmqpMethod
+ def fields_c() content ? fields+[ContentField.new] : fields end
+ def param_names_c() fields_c.map { |f| f.cppname} end
+ def signature_c() fields_c.map { |f| f.signature }; end
+ def sig_c_default() fields_c.map { |f| f.sig_default }; end
+ def argpack_name() "#{parent.cppname}#{name.caps}Parameters"; end
+ def argpack_type()
+ "boost::parameter::parameters<" +
+ fields_c.map { |f| "arg::keyword_tags::"+f.cppname }.join(',') +
+ ">"
+ end
+ def return_type()
+ return "TypedResult<qpid::framing::#{result.cpptype.ret_by_val}>" if (result)
+ return "Response" if (not responses().empty?)
+ return "Completion"
+ end
+ def session_function() "#{parent.name.lcaps}#{name.caps}"; end
+end
+
+class SessionNoKeywordGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @chassis="server"
+ @namespace,@classname,@file=
+ parse_classname "qpid::client::no_keyword::Session_#{@amqp.version.bars}"
+ end
+
+ def generate()
+ h_file(@file) {
+ include "qpid/client/SessionBase.h"
+
+ namespace("qpid::client") {
+ genl "using std::string;"
+ genl "using framing::Content;"
+ genl "using framing::FieldTable;"
+ genl "using framing::MethodContent;"
+ genl "using framing::SequenceNumber;"
+ genl "using framing::SequenceSet;"
+ genl "using framing::Uuid;"
+ #the following are nasty... would be better to dynamically
+ #include such statements based on params required
+ genl "using framing::Xid;"
+ genl
+ namespace("no_keyword") {
+ doxygen_comment {
+ genl "AMQP #{@amqp.version} session API."
+ genl @amqp.class_("session").doc
+ }
+ cpp_class(@classname, "public SessionBase") {
+ public
+ genl "Session_#{@amqp.version.bars}() {}"
+ genl "Session_#{@amqp.version.bars}(shared_ptr<SessionImpl> core) : SessionBase(core) {}"
+ session_methods.each { |m|
+ genl
+ doxygen(m)
+ args=m.sig_c_default.join(", ")
+ genl "#{m.return_type} #{m.session_function}(#{args});"
+ }
+ }}}}
+
+ cpp_file(@file) {
+ include @classname
+ include "qpid/framing/all_method_bodies.h"
+ namespace(@namespace) {
+ genl "using namespace framing;"
+ session_methods.each { |m|
+ genl
+ sig=m.signature_c.join(", ")
+ func="#{@classname}::#{m.session_function}"
+ scope("#{m.return_type} #{func}(#{sig}) {") {
+ args=(["ProtocolVersion()"]+m.param_names).join(", ")
+ body="#{m.body_name}(#{args})"
+ sendargs=body
+ sendargs << ", content" if m.content
+ genl "return #{m.return_type}(impl->send(#{sendargs}), impl);"
+ }}}}
+ end
+end
+
+class SessionGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @chassis="server"
+ session="Session_#{@amqp.version.bars}"
+ @base="no_keyword::#{session}"
+ @fqclass=FqClass.new "qpid::client::#{session}"
+ @classname=@fqclass.name
+ @fqbase=FqClass.new("qpid::client::#{@base}")
+ end
+
+ def gen_keyword_decl(m, prefix)
+ return if m.fields_c.empty? # Inherited function will do.
+ scope("BOOST_PARAMETER_MEMFUN(#{m.return_type}, #{m.session_function}, 0, #{m.fields_c.size}, #{m.argpack_name}) {") {
+ scope("return #{prefix}#{m.session_function}(",");") {
+ gen m.fields_c.map { |f| f.unpack() }.join(",\n")
+ }
+ }
+ genl
+ end
+
+ def generate()
+ keyword_methods=session_methods.reject { |m| m.fields_c.empty? }
+ max_arity = keyword_methods.map{ |m| m.fields_c.size }.max
+
+ h_file(@fqclass.file) {
+ include @fqbase.file
+ genl
+ genl "#define BOOST_PARAMETER_MAX_ARITY #{max_arity}"
+ include "<boost/parameter.hpp>"
+ genl
+ namespace("qpid::client") {
+ # Generate keyword tag declarations.
+ namespace("arg") {
+ keyword_methods.map{ |m| m.param_names_c }.flatten.uniq.each { |k|
+ genl "BOOST_PARAMETER_KEYWORD(keyword_tags, #{k})"
+ }}
+ genl
+ # Doxygen comment.
+ doxygen_comment {
+ genl "AMQP #{@amqp.version} session API with keyword arguments."
+ genl <<EOS
+This class provides the same set of functions as #{@base}, but also
+allows parameters be passed using keywords. The keyword is the
+parameter name in the namespace "arg".
+
+For example given the normal function "foo(int x=0, int y=0, int z=0)"
+you could call it in either of the following ways:
+
+@code
+session.foo(1,2,3); // Normal no keywords
+session.foo(arg::z=3, arg::x=1); // Keywords and a default
+@endcode
+
+The keyword functions are easy to use but their declarations are hard
+to read. You may find it easier to read the documentation for #{@base}
+which provides the same set of functions using normal non-keyword
+declarations.
+
+\\ingroup clientapi
+EOS
+ }
+ # Session class.
+ cpp_class(@classname,"public #{@base}") {
+ private
+ genl "#{@classname}(shared_ptr<SessionImpl> core) : #{ @base}(core) {}"
+ keyword_methods.each { |m| typedef m.argpack_type, m.argpack_name }
+ genl "friend class Connection;"
+ public
+ genl "#{@classname}() {}"
+ keyword_methods.each { |m| gen_keyword_decl(m,@base+"::") }
+ }}}
+ end
+end
+
+SessionNoKeywordGen.new(ARGV[0], $amqp).generate()
+SessionGen.new(ARGV[0], $amqp).generate()
+
diff --git a/cpp/rubygen/framing.0-10/all_method_bodies.rb b/cpp/rubygen/framing.0-10/all_method_bodies.rb
new file mode 100755
index 0000000000..5971d49189
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/all_method_bodies.rb
@@ -0,0 +1,21 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class AllMethodBodiesGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @filename="qpid/framing/all_method_bodies"
+ end
+
+ def generate()
+ h_file(@filename) {
+ @amqp.methods_.each { |m| include "qpid/framing/"+m.body_name }
+ }
+ end
+end
+
+AllMethodBodiesGen.new($outdir, $amqp).generate();
+
diff --git a/cpp/rubygen/framing.0-10/constants.rb b/cpp/rubygen/framing.0-10/constants.rb
new file mode 100755
index 0000000000..35067a733c
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/constants.rb
@@ -0,0 +1,99 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class ConstantsGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @namespace="qpid::framing"
+ @dir="qpid/framing"
+ end
+
+ def constants_h()
+ h_file("#{@dir}/constants") {
+ namespace(@namespace) {
+ scope("enum AmqpConstant {","};") {
+ l=[]
+ l.concat @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }
+ @amqp.classes.each { |c|
+ l << "#{c.name.shout}_CLASS_ID=#{c.code}"
+ l.concat c.methods_.map { |m|
+ "#{c.name.shout}_#{m.name.shout}_METHOD_ID=#{m.code}" }
+ }
+ genl l.join(",\n")
+ }
+ namespace("execution") {
+ define_constants_for(@amqp.class_("execution").domain("error-code").enum)
+ }
+ namespace("connection") {
+ define_constants_for(@amqp.class_("connection").domain("close-code").enum)
+ }
+ namespace("session") {
+ define_constants_for(@amqp.class_("session").domain("detach-code").enum)
+ }
+ define_constants_for(@amqp.class_("dtx").domain("xa-status").enum)
+ }
+ }
+ end
+
+ def define_constants_for(enum)
+ scope("enum #{enum.parent.name.caps} {","};") {
+ genl enum.choices.collect { |c| "#{c.name.shout}=#{c.value}" }.join(",\n")
+ }
+ end
+
+ def define_exception(c, base, package)
+ name=c.name.caps+"Exception"
+ genl
+ doxygen_comment { genl c.doc }
+ struct(c.name.caps+"Exception", base) {
+ genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"#{c.name}: \"+msg) {}"
+ }
+ end
+
+ def define_exceptions_for(class_name, domain_name, base)
+ enum = @amqp.class_(class_name).domain(domain_name).enum
+ enum.choices.each { |c| define_exception(c, base, class_name) unless c.name == "normal" }
+ end
+
+ def reply_exceptions_h()
+ h_file("#{@dir}/reply_exceptions") {
+ include "qpid/Exception"
+ namespace(@namespace) {
+ define_exceptions_for("execution", "error-code", "SessionException")
+ define_exceptions_for("connection", "close-code", "ConnectionException")
+ define_exceptions_for("session", "detach-code", "ChannelException")
+ genl
+ genl "void throwExecutionException(int code, const std::string& text);"
+ }
+ }
+ end
+
+ def reply_exceptions_cpp()
+ cpp_file("#{@dir}/reply_exceptions") {
+ include "#{@dir}/reply_exceptions"
+ include "<sstream>"
+ namespace("qpid::framing") {
+ scope("void throwExecutionException(int code, const std::string& text) {"){
+ scope("switch (code) {") {
+ enum = @amqp.class_("execution").domain("error-code").enum
+ enum.choices.each { |c|
+ genl "case #{c.value}: throw #{c.name.caps}Exception(text);"
+ }
+ genl "default: break;"
+ }
+ }
+ }
+ }
+ end
+
+ def generate()
+ constants_h
+ reply_exceptions_h
+ reply_exceptions_cpp
+ end
+end
+
+ConstantsGen.new($outdir, $amqp).generate();
+
diff --git a/cpp/rubygen/framing.0-10/frame_body_lists.rb b/cpp/rubygen/framing.0-10/frame_body_lists.rb
new file mode 100644
index 0000000000..b20e4550f3
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/frame_body_lists.rb
@@ -0,0 +1,31 @@
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class FrameBodyListsGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp);
+ end
+
+ def generate
+ h_file("qpid/framing/frame_body_lists.h") {
+ gen <<EOS
+/**@file
+ * Macro lists of frame body classes, used to generate Visitors
+ */
+EOS
+ gen "#define METHOD_BODIES() "
+ @amqp.methods_.each { |m| gen "\\\n (#{m.body_name}) " }
+ gen <<EOS
+
+
+#define OTHER_BODIES() (AMQContentBody)(AMQHeaderBody)(AMQHeartbeatBody))
+
+EOS
+ }
+ end
+end
+
+FrameBodyListsGen.new(ARGV[0], $amqp).generate;
+
+
diff --git a/cpp/rubygen/framing.0-10/structs.rb b/cpp/rubygen/framing.0-10/structs.rb
new file mode 100644
index 0000000000..35e7717122
--- /dev/null
+++ b/cpp/rubygen/framing.0-10/structs.rb
@@ -0,0 +1,583 @@
+#!/usr/bin/env ruby
+# Usage: output_directory xml_spec_file [xml_spec_file...]
+#
+$: << '..'
+require 'cppgen'
+
+class StructGen < CppGen
+
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ end
+
+ SizeMap={
+ "Octet"=>1,
+ "Short"=>2,
+ "Long"=>4,
+ "LongLong"=>8,
+ "int8"=>1,
+ "int16"=>2,
+ "int32"=>4,
+ "int64"=>8,
+ "uint8"=>1,
+ "uint16"=>2,
+ "uint32"=>4,
+ "uint64"=>8,
+ "timestamp"=>8
+ }
+
+ SizeType={
+ 1=>"Octet",
+ 2=>"Short",
+ 4=>"Long",
+ 8=>"LongLong"
+ }
+
+ ValueTypes=["uint8_t", "uint16_t", "uint32_t", "uint64_t"]
+
+ def is_packed(s) s.pack and s.pack != "0" end
+
+ def execution_header?(s)
+ s.is_a? AmqpMethod and not s.parent.control?
+ # s.kind_of? AmqpMethod and s.parent.name.include?("010") and not s.parent.control?
+ end
+
+ def has_bitfields_only(s)
+ s.fields.select {|f| f.type_ != "bit"}.empty?
+ end
+
+ def default_initialisation(s)
+ params = s.fields.select {|f| ValueTypes.include?(f.cpptype.name) || (!is_packed(s) && f.type_ == "bit")}
+ strings = params.collect {|f| "#{f.cppname}(0)"}
+ strings << "flags(0)" if (is_packed(s))
+ if strings.empty?
+ return ""
+ else
+ return " : " + strings.join(", ")
+ end
+ end
+
+ def printable_form(f)
+ if (f.cpptype.name == "uint8_t")
+ return "(int) " + f.cppname
+ elsif (f.type_ == "bit")
+ return "get#{f.name.caps}()"
+ else
+ return f.cppname
+ end
+ end
+
+ def flag_mask(s, i)
+ pos = s.pack.to_i*8 - 8 - (i/8)*8 + (i % 8)
+ return "(1 << #{pos})"
+ end
+
+ def encode_packed_struct(s)
+ genl s.cpp_pack_type.encode('flags', 'buffer')
+ process_packed_fields(s) { |f, i| encode_packed_field(s, f, i) unless f.type_ == "bit" }
+ end
+
+ def decode_packed_struct(s)
+ genl "#{s.cpp_pack_type.decode('flags', 'buffer')}"
+ process_packed_fields(s) { |f, i| decode_packed_field(s, f, i) unless f.type_ == "bit" }
+ end
+
+ def size_packed_struct(s)
+ genl "total += #{s.pack};"
+ process_packed_fields(s) { |f, i| size_packed_field(s, f, i) unless f.type_ == "bit" }
+ end
+
+ def print_packed_struct(s)
+ process_packed_fields(s) { |f, i| print_packed_field(s, f, i) }
+ end
+
+ def encode_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
+ indent { genl f.cpptype.encode(f.cppname,"buffer") }
+ end
+
+ def decode_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
+ indent { genl f.cpptype.decode(f.cppname,"buffer") }
+ end
+
+ def size_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
+ indent { generate_size(f, []) }
+ end
+
+ def print_packed_field(s, f, i)
+ genl "if (flags & #{flag_mask(s, i)})"
+ indent {
+ genl "out << \"#{f.name}=\" << #{printable_form(f)} << \"; \";"
+ }
+ end
+
+ def generate_encode(f, combined)
+ if (f.type_ == "bit")
+ genl "uint8_t #{f.cppname}_bits = #{f.cppname};"
+ count = 0
+ combined.each { |c| genl "#{f.cppname}_bits |= #{c.cppname} << #{count += 1};" }
+ genl "buffer.putOctet(#{f.cppname}_bits);"
+ else
+ genl f.cpptype.encode(f.cppname,"buffer")
+ end
+ end
+
+ def generate_decode(f, combined)
+ if (f.type_ == "bit")
+ genl "uint8_t #{f.cppname}_bits = buffer.getOctet();"
+ genl "#{f.cppname} = 1 & #{f.cppname}_bits;"
+ count = 0
+ combined.each { |c| genl "#{c.cppname} = (1 << #{count += 1}) & #{f.cppname}_bits;" }
+ else
+ genl f.cpptype.decode(f.cppname,"buffer")
+ end
+ end
+
+ def generate_size(f, combined)
+ if (f.type_ == "bit")
+ names = ([f] + combined).collect {|g| g.cppname}
+ genl "total += 1;//#{names.join(", ")}"
+ else
+ size = SizeMap[f.cpptype.encoded]
+ if (size)
+ genl "total += #{size};//#{f.cppname}"
+ elsif (f.cpptype.name == "SequenceNumberSet")
+ genl "total += #{f.cppname}.encodedSize();"
+ else
+ encoded = f.cpptype.encoded
+ gen "total += ("
+ gen "4 + " if encoded == "LongString"
+ gen "2 + " if encoded == "MediumString"
+ gen "1 + " if encoded == "ShortString"
+ genl "#{f.cppname}.size());"
+ end
+ end
+ end
+
+ def process_packed_fields(s)
+ s.fields.each { |f| yield f, s.fields.index(f) }
+ end
+
+ def process_fields(s)
+ last = nil
+ count = 0
+ bits = []
+ s.fields.each {
+ |f| if (last and last.bit? and f.bit? and count < 7)
+ count += 1
+ bits << f
+ else
+ if (last and last.bit?)
+ yield last, bits
+ count = 0
+ bits = []
+ end
+ if (not f.bit?)
+ yield f
+ end
+ last = f
+ end
+ }
+ if (last and last.bit?)
+ yield last, bits
+ end
+ end
+
+ def all_fields_via_accessors(s)
+ s.fields.collect { |f| "get#{f.name.caps}()" }.join(", ")
+ end
+
+ def methodbody_extra_defs(s)
+ if (s.parent.control?)
+ genl "virtual uint8_t type() const { return 0;/*control segment*/ }"
+ end
+
+
+ gen <<EOS
+ typedef #{s.result ? s.result.cpptype.name : 'void'} ResultType;
+
+ template <class T> ResultType invoke(T& invocable) const {
+ return invocable.#{s.cppname}(#{all_fields_via_accessors(s)});
+ }
+
+ using AMQMethodBody::accept;
+ void accept(MethodBodyConstVisitor& v) const { v.visit(*this); }
+
+ ClassId amqpClassId() const { return CLASS_ID; }
+ MethodId amqpMethodId() const { return METHOD_ID; }
+ bool isContentBearing() const { return #{s.content ? "true" : "false" }; }
+ bool resultExpected() const { return #{s.result ? "true" : "false"}; }
+ bool responseExpected() const { return #{s.responses().empty? ? "false" : "true"}; }
+EOS
+ end
+
+ def define_constructor(name, s)
+ if (s.fields.size > 0)
+ genl "#{name}("
+ if (s.kind_of? AmqpMethod)
+ indent {gen "ProtocolVersion, "}
+ end
+ indent { gen s.fields.collect { |f| "#{f.cpptype.param} _#{f.cppname}" }.join(",\n") }
+ genl ") : "
+ if (is_packed(s))
+ initialisers = s.fields.select { |f| f.type_ != "bit"}.collect { |f| "#{f.cppname}(_#{f.cppname})"}
+
+ initialisers << "flags(0)"
+ indent { gen initialisers.join(",\n") }
+ genl "{"
+ indent {
+ process_packed_fields(s) { |f, i| genl "set#{f.name.caps}(_#{f.cppname});" if f.type_ == "bit"}
+ process_packed_fields(s) { |f, i| genl "flags |= #{flag_mask(s, i)};" unless f.type_ == "bit"}
+ }
+ genl "}"
+ else
+ indent { gen s.fields.collect { |f| " #{f.cppname}(_#{f.cppname})" }.join(",\n") }
+ genl "{}"
+ end
+ end
+ #default constructors:
+ if (s.kind_of? AmqpMethod)
+ genl "#{name}(ProtocolVersion=ProtocolVersion()) #{default_initialisation(s)} {}"
+ end
+ if (s.kind_of? AmqpStruct)
+ genl "#{name}() #{default_initialisation(s)} {}"
+ end
+ end
+
+ def define_packed_field_accessors(s, f, i)
+ if (s.kind_of? AmqpMethod)
+ define_packed_field_accessors_for_method(s, f, i)
+ else
+ define_packed_field_accessors_for_struct(s, f, i)
+ end
+ end
+
+ def define_packed_field_accessors_for_struct(s, f, i)
+ if (f.type_ == "bit")
+ genl "void #{s.cppname}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
+ indent {
+ genl "if (_#{f.cppname}) flags |= #{flag_mask(s, i)};"
+ genl "else flags &= ~#{flag_mask(s, i)};"
+ }
+ genl "}"
+ genl "#{f.cpptype.ret} #{s.cppname}::get#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }"
+ else
+ genl "void #{s.cppname}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
+ indent {
+ genl "#{f.cppname} = _#{f.cppname};"
+ genl "flags |= #{flag_mask(s, i)};"
+ }
+ genl "}"
+ genl "#{f.cpptype.ret} #{s.cppname}::get#{f.name.caps}() const { return #{f.cppname}; }"
+ if (f.cpptype.name == "FieldTable")
+ genl "#{f.cpptype.name}& #{s.cppname}::get#{f.name.caps}() {"
+ indent {
+ genl "flags |= #{flag_mask(s, i)};"#treat the field table as having been 'set'
+ genl "return #{f.cppname};"
+ }
+ genl "}"
+ end
+ genl "bool #{s.cppname}::has#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }"
+ genl "void #{s.cppname}::clear#{f.name.caps}Flag() { flags &= ~#{flag_mask(s, i)}; }"
+ end
+ genl ""
+ end
+
+ def define_packed_field_accessors_for_method(s, f, i)
+ if (f.type_ == "bit")
+ genl "void #{s.body_name}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
+ indent {
+ genl "if (_#{f.cppname}) flags |= #{flag_mask(s, i)};"
+ genl "else flags &= ~#{flag_mask(s, i)};"
+ }
+ genl "}"
+ genl "#{f.cpptype.ret} #{s.body_name}::get#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }"
+ else
+ genl "void #{s.body_name}::set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) {"
+ indent {
+ genl "#{f.cppname} = _#{f.cppname};"
+ genl "flags |= #{flag_mask(s, i)};"
+ }
+ genl "}"
+ genl "#{f.cpptype.ret} #{s.body_name}::get#{f.name.caps}() const { return #{f.cppname}; }"
+ if (f.cpptype.name == "FieldTable")
+ genl "#{f.cpptype.name}& #{s.body_name}::get#{f.name.caps}() {"
+ indent {
+ genl "flags |= #{flag_mask(s, i)};"#treat the field table as having been 'set'
+ genl "return #{f.cppname};"
+ }
+ genl "}"
+ end
+ genl "bool #{s.body_name}::has#{f.name.caps}() const { return flags & #{flag_mask(s, i)}; }"
+ genl "void #{s.body_name}::clear#{f.name.caps}Flag() { flags &= ~#{flag_mask(s, i)}; }"
+ end
+ genl ""
+ end
+
+ def define_packed_accessors(s)
+ process_packed_fields(s) { |f, i| define_packed_field_accessors(s, f, i) }
+ end
+
+ def declare_packed_accessors(f)
+ genl "void set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname});";
+ genl "#{f.cpptype.ret} get#{f.name.caps}() const;"
+ if (f.cpptype.name == "FieldTable")
+ genl "#{f.cpptype.name}& get#{f.name.caps}();"
+ end
+ if (f.type_ != "bit")
+ #extra 'accessors' for packed fields:
+ genl "bool has#{f.name.caps}() const;"
+ genl "void clear#{f.name.caps}Flag();"
+ end
+ end
+
+ def define_accessors(f)
+ genl "void set#{f.name.caps}(#{f.cpptype.param} _#{f.cppname}) { #{f.cppname} = _#{f.cppname}; }"
+ genl "#{f.cpptype.ret} get#{f.name.caps}() const { return #{f.cppname}; }"
+ if (f.cpptype.name == "FieldTable")
+ genl "#{f.cpptype.name}& get#{f.name.caps}() { return #{f.cppname}; }"
+ end
+ end
+
+ def define_struct(s)
+ classname = s.cppname
+ inheritance = ""
+ if (s.kind_of? AmqpMethod)
+ classname = s.body_name
+ if (execution_header?(s))
+ inheritance = ": public ModelMethod"
+ else
+ inheritance = ": public AMQMethodBody"
+ end
+ end
+
+ h_file("qpid/framing/#{classname}.h") {
+ if (s.kind_of? AmqpMethod)
+ gen <<EOS
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/MethodBodyConstVisitor.h"
+EOS
+ end
+ include "qpid/framing/ModelMethod.h" if (execution_header?(s))
+
+ s.fields.each { |f| include "qpid/framing/#{f.cpptype.name}" if f.struct?}
+
+ gen <<EOS
+
+#include <ostream>
+#include "qpid/framing/amqp_types_full.h"
+
+namespace qpid {
+namespace framing {
+
+class #{classname} #{inheritance} {
+EOS
+ if (is_packed(s))
+ indent { s.fields.each { |f| genl "#{f.cpptype.name} #{f.cppname};" unless f.type_ == "bit"} }
+ indent {
+ genl "#{s.cpp_pack_type.name} flags;"
+ }
+ else
+ indent { s.fields.each { |f| genl "#{f.cpptype.name} #{f.cppname};" } }
+ end
+ genl "public:"
+ if (s.kind_of? AmqpMethod)
+ indent { genl "static const ClassId CLASS_ID = #{s.parent.code};" }
+ indent { genl "static const MethodId METHOD_ID = #{s.code};" }
+ end
+
+ if (s.kind_of? AmqpStruct)
+ if (s.code)
+ indent { genl "static const uint16_t TYPE = #{s.full_code};" }
+ end
+ end
+
+ indent {
+ define_constructor(classname, s)
+ genl ""
+ if (is_packed(s))
+ s.fields.each { |f| declare_packed_accessors(f) }
+ else
+ s.fields.each { |f| define_accessors(f) }
+ end
+ }
+ if (s.kind_of? AmqpMethod)
+ methodbody_extra_defs(s)
+ end
+ if (s.kind_of? AmqpStruct)
+ indent {genl "friend std::ostream& operator<<(std::ostream&, const #{classname}&);" }
+ end
+
+ gen <<EOS
+ void encode(Buffer&) const;
+ void decode(Buffer&, uint32_t=0);
+ void encodeStructBody(Buffer&) const;
+ void decodeStructBody(Buffer&, uint32_t=0);
+ uint32_t size() const;
+ uint32_t bodySize() const;
+ void print(std::ostream& out) const;
+}; /* class #{classname} */
+
+}}
+EOS
+ }
+ cpp_file("qpid/framing/#{classname}.cpp") {
+ if (is_packed(s) || s.fields.size > 0 || execution_header?(s))
+ buffer = "buffer"
+ else
+ buffer = "/*buffer*/"
+ end
+ gen <<EOS
+#include "#{classname}.h"
+
+#include "reply_exceptions.h"
+
+using namespace qpid::framing;
+
+EOS
+
+ if (is_packed(s))
+ define_packed_accessors(s)
+ end
+ gen <<EOS
+void #{classname}::encodeStructBody(Buffer& #{buffer}) const
+{
+EOS
+ if (execution_header?(s))
+ genl "encodeHeader(buffer);"
+ end
+
+ if (is_packed(s))
+ indent {encode_packed_struct(s)}
+ else
+ indent { process_fields(s) { |f, combined| generate_encode(f, combined) } }
+ end
+ gen <<EOS
+}
+
+void #{classname}::encode(Buffer& buffer) const
+{
+EOS
+ indent {
+ if (s.kind_of? AmqpStruct)
+ if (s.code)
+ genl "buffer.put#{SizeType[s.size.to_i]}(bodySize() + 2/*typecode*/);" if s.size and s.size.to_i != 0
+ genl "buffer.putShort(TYPE);"
+ else
+ genl "buffer.put#{SizeType[s.size.to_i]}(bodySize());" if s.size and s.size.to_i != 0
+ end
+ end
+ genl "encodeStructBody(buffer);"
+ }
+ gen <<EOS
+}
+
+void #{classname}::decodeStructBody(Buffer& #{buffer}, uint32_t /*size*/)
+{
+EOS
+ if (execution_header?(s))
+ genl "decodeHeader(buffer);"
+ end
+
+ if (is_packed(s))
+ indent {decode_packed_struct(s)}
+ else
+ indent { process_fields(s) { |f, combined| generate_decode(f, combined) } }
+ end
+ gen <<EOS
+}
+
+void #{classname}::decode(Buffer& buffer, uint32_t /*size*/)
+{
+EOS
+ indent {
+ if (s.kind_of? AmqpStruct)
+ genl "buffer.get#{SizeType[s.size.to_i]}();" if s.size and s.size.to_i != 0
+ genl "if (TYPE != buffer.getShort()) throw FramingErrorException(\"Bad type code for struct\");" if s.code
+ end
+ genl "decodeStructBody(buffer);"
+ }
+ gen <<EOS
+}
+
+uint32_t #{classname}::bodySize() const
+{
+ uint32_t total = 0;
+EOS
+ if (execution_header?(s))
+ genl "total += headerSize();"
+ end
+
+ if (is_packed(s))
+ indent {size_packed_struct(s)}
+ else
+ indent { process_fields(s) { |f, combined| generate_size(f, combined) } }
+ end
+ gen <<EOS
+ return total;
+}
+
+uint32_t #{classname}::size() const
+{
+ uint32_t total = bodySize();
+EOS
+ if (s.kind_of? AmqpStruct)
+ genl "total += #{s.size}/*size field*/;" if s.size
+ genl "total += 2/*typecode*/;" if s.code
+ end
+ gen <<EOS
+ return total;
+}
+
+void #{classname}::print(std::ostream& out) const
+{
+ out << "{#{classname}: ";
+EOS
+ if (is_packed(s))
+ indent {print_packed_struct(s)}
+ else
+ copy = Array.new(s.fields)
+ f = copy.shift
+
+ indent {
+ genl "out << \"#{f.name}=\" << #{printable_form(f)};" if f
+ copy.each { |f| genl "out << \"; #{f.name}=\" << #{printable_form(f)};" }
+ }
+ end
+ gen <<EOS
+ out << "}";
+}
+EOS
+
+ if (s.kind_of? AmqpStruct)
+ gen <<EOS
+namespace qpid{
+namespace framing{
+
+ std::ostream& operator<<(std::ostream& out, const #{classname}& s)
+ {
+ s.print(out);
+ return out;
+ }
+
+}
+}
+EOS
+ end
+}
+ end
+
+ def generate()
+ structs = @amqp.collect_all(AmqpStruct).select { |s| not ["command-fragment"].include?(s.name) }
+ structs.each { |s| define_struct(s) }
+ @amqp.methods_.each { |m| define_struct(m) }
+ #generate a single include file containing the list of structs for convenience
+ h_file("qpid/framing/amqp_structs.h") { structs.each { |s| genl "#include \"#{s.cppname}.h\"" } }
+ end
+end
+
+StructGen.new(ARGV[0], $amqp).generate()
+
diff --git a/cpp/rubygen/generate b/cpp/rubygen/generate
index 85fbefdea1..c025c946c7 100755
--- a/cpp/rubygen/generate
+++ b/cpp/rubygen/generate
@@ -33,17 +33,23 @@ def parse_specs(files)
return specs
end
+gendir=File.dirname(__FILE__)
+
# Run selected templates
if ARGV.any? { |arg| arg=="all" }
- templates=Dir["#{File.dirname __FILE__}/*/*.rb"]
+ templates=Dir["#{gendir}/*/*.rb"]
else
templates=ARGV.grep(/\.rb$/)
+ ARGV.each { |arg|
+ d=File.join gendir,arg
+ templates += Dir["#{d}/*.rb"] if File.directory? d
+ }
end
$outdir=ARGV[0]
$models=parse_specs(ARGV.grep(/\.xml$/))
templates.each { |t|
- ver=Pathname.new(t).dirname.basename.to_s
+ ver=Pathname.new(t).dirname.basename.to_s.split('.')[-1]
$amqp=$models[ver]
if $amqp
load t
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 597e566049..ce36a33933 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -13,10 +13,9 @@ force:
if GENERATE
-# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac
-amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml
+# AMQP_FINAL_XML is defined in ../configure.ac
amqp_0_10_xml=@AMQP_FINAL_XML@
-specs=$(amqp_99_0_xml) $(amqp_0_10_xml)
+specs=$(amqp_0_10_xml)
# Ruby generator.
rgen_dir=$(top_srcdir)/rubygen
@@ -152,7 +151,6 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Buffer.cpp \
qpid/framing/FieldTable.cpp \
qpid/framing/FieldValue.cpp \
- qpid/framing/FramingContent.cpp \
qpid/framing/FrameSet.cpp \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
@@ -193,19 +191,11 @@ libqpidbroker_la_SOURCES = \
qpid/amqp_0_10/Connection.h \
qpid/amqp_0_10/Connection.cpp \
qpid/broker/Broker.cpp \
- qpid/broker/BrokerAdapter.cpp \
- qpid/broker/SessionAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
- qpid/broker/PreviewConnection.cpp \
- qpid/broker/PreviewConnectionCodec.cpp \
- qpid/broker/PreviewConnectionHandler.cpp \
- qpid/broker/PreviewSessionHandler.cpp \
- qpid/broker/PreviewSessionManager.cpp \
- qpid/broker/PreviewSessionState.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -215,7 +205,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DirectExchange.cpp \
qpid/broker/DtxAck.cpp \
qpid/broker/DtxBuffer.cpp \
- qpid/broker/DtxHandlerImpl.cpp \
qpid/broker/DtxManager.cpp \
qpid/broker/DtxTimeout.cpp \
qpid/broker/DtxWorkRecord.cpp \
@@ -228,7 +217,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageDelivery.cpp \
- qpid/broker/MessageHandlerImpl.cpp \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NullMessageStore.cpp \
@@ -241,6 +229,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SaslAuthenticator.cpp \
qpid/broker/SemanticState.h \
qpid/broker/SemanticState.cpp \
+ qpid/broker/SessionAdapter.cpp \
qpid/broker/SessionState.h \
qpid/broker/SessionState.cpp \
qpid/broker/SessionManager.h \
@@ -248,7 +237,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionHandler.h \
qpid/broker/SessionContext.h \
qpid/broker/SessionHandler.cpp \
- qpid/broker/SemanticHandler.cpp \
qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
@@ -308,18 +296,11 @@ nobase_include_HEADERS = \
qpid/memory.h \
qpid/shared_ptr.h \
qpid/broker/Broker.h \
- qpid/broker/BrokerAdapter.h \
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
- qpid/broker/PreviewConnection.h \
- qpid/broker/PreviewConnectionCodec.h \
- qpid/broker/PreviewConnectionHandler.h \
- qpid/broker/PreviewSessionHandler.h \
- qpid/broker/PreviewSessionManager.h \
- qpid/broker/PreviewSessionState.h \
qpid/broker/Connection.h \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
@@ -337,7 +318,6 @@ nobase_include_HEADERS = \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
- qpid/broker/DtxHandlerImpl.h \
qpid/broker/DtxManager.h \
qpid/broker/DtxTimeout.h \
qpid/broker/DtxWorkRecord.h \
@@ -351,7 +331,6 @@ nobase_include_HEADERS = \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
qpid/broker/MessageDelivery.h \
- qpid/broker/MessageHandlerImpl.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.h \
qpid/broker/NameGenerator.h \
@@ -372,8 +351,8 @@ nobase_include_HEADERS = \
qpid/broker/RecoveredEnqueue.h \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveryManagerImpl.h \
- qpid/broker/SemanticHandler.h \
qpid/broker/SaslAuthenticator.h \
+ qpid/broker/SessionAdapter.h \
qpid/broker/SessionManager.h \
qpid/broker/System.h \
qpid/broker/Timer.h \
@@ -434,7 +413,6 @@ nobase_include_HEADERS = \
qpid/framing/FrameHandler.h \
qpid/framing/FrameHandler.h \
qpid/framing/FrameSet.h \
- qpid/framing/FramingContent.h \
qpid/framing/Handler.h \
qpid/framing/HeaderProperties.h \
qpid/framing/Invoker.h \
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 2f934166a7..e74fa79ed9 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -23,6 +23,7 @@
*/
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/constants.h"
#include "qpid/Msg.h"
#include <memory>
@@ -51,29 +52,22 @@ class Exception : public std::exception
mutable std::string whatStr;
};
-/**
- * I have made SessionException a common base for Channel- and
- * Connection- Exceptions. This is not strictly correct but allows all
- * model layer exceptions to be handled as SessionExceptions which is
- * how they are classified in the final 0-10 specification. I.e. this
- * is a temporary hack to allow the preview and final codepaths to
- * co-exist with minimal changes.
- */
-
struct SessionException : public Exception {
const framing::ReplyCode code;
SessionException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
};
-struct ChannelException : public SessionException {
- ChannelException(framing::ReplyCode code, const std::string& message)
- : SessionException(code, message) {}
+struct ChannelException : public Exception {
+ const framing::ReplyCode code;
+ ChannelException(framing::ReplyCode _code, const std::string& message)
+ : Exception(message), code(_code) {}
};
-struct ConnectionException : public SessionException {
- ConnectionException(framing::ReplyCode code, const std::string& message)
- : SessionException(code, message) {}
+struct ConnectionException : public Exception {
+ const framing::ReplyCode code;
+ ConnectionException(framing::ReplyCode _code, const std::string& message)
+ : Exception(message), code(_code) {}
};
struct ClosedException : public Exception {
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 598f428ad2..ea9a41ac9d 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -46,24 +46,24 @@ Bridge::~Bridge()
void Bridge::create()
{
- framing::AMQP_ServerProxy::Session010 session(channel);
+ framing::AMQP_ServerProxy::Session session(channel);
session.attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable());
- peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
- peer.getQueue010().declare(queue, "", false, false, true, true, FieldTable());
- peer.getExchange010().bind(queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
- peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getQueue().declare(queue, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer.getMessage().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
@@ -71,8 +71,8 @@ void Bridge::create()
void Bridge::cancel()
{
- peer.getMessage010().cancel(args.i_dest);
- peer.getSession010().detach(name);
+ peer.getMessage().cancel(args.i_dest);
+ peer.getSession().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 6cbd9bf343..d9cf93f766 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -131,8 +131,7 @@ Broker::Broker(const Broker::Options& conf) :
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
factory(*this),
- sessionManager(conf.ack),
- previewSessionManager(conf.ack)
+ sessionManager(conf.ack)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index fa66061fd0..7297241763 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -30,7 +30,6 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
-#include "PreviewSessionManager.h"
#include "Vhost.h"
#include "System.h"
#include "qpid/management/Manageable.h"
@@ -117,7 +116,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
- PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -148,7 +146,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
- PreviewSessionManager previewSessionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
deleted file mode 100644
index b83a275959..0000000000
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "BrokerAdapter.h"
-#include "Connection.h"
-#include "DeliveryToken.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/Exception.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace broker {
-
-using namespace qpid;
-using namespace qpid::framing;
-
-typedef std::vector<Queue::shared_ptr> QueueVector;
-
-// TODO aconway 2007-08-31: now that functionality is distributed
-// between different handlers, BrokerAdapter should be dropped.
-// Instead the individual class Handler interfaces can be implemented
-// by the handlers responsible for those classes.
-//
-
-BrokerAdapter::BrokerAdapter(SemanticState& s) :
- HandlerImpl(s),
- basicHandler(s),
- exchangeHandler(s),
- bindingHandler(s),
- messageHandler(s),
- queueHandler(s),
- txHandler(s),
- dtxHandler(s)
-{}
-
-
-void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
- const string& alternateExchange,
- bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
- Exchange::shared_ptr alternate;
- if (!alternateExchange.empty()) {
- alternate = getBroker().getExchanges().get(alternateExchange);
- }
- if(passive){
- Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
- checkType(actual, type);
- checkAlternate(actual, alternate);
- }else{
- try{
- std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
- if (response.second) {
- if (durable) {
- getBroker().getStore().create(*response.first, args);
- }
- if (alternate) {
- response.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
- } else {
- checkType(response.first, type);
- checkAlternate(response.first, alternate);
- }
- }catch(UnknownExchangeTypeException& e){
- throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
- }
- }
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
-{
- if (!type.empty() && exchange->getType() != type) {
- throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
- }
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
-{
- if (alternate && alternate != exchange->getAlternate())
- throw NotAllowedException(
- QPID_MSG("Exchange declared with alternate-exchange "
- << exchange->getAlternate()->getName() << ", requested "
- << alternate->getName()));
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
- //TODO: implement unused
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
- if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- getBroker().getExchanges().destroy(name);
-}
-
-ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
-{
- try {
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
- } catch (const ChannelException& e) {
- return ExchangeXQueryResult("", false, true, FieldTable());
- }
-}
-
-BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
- const std::string& exchangeName,
- const std::string& queueName,
- const std::string& key,
- const framing::FieldTable& args)
-{
- Exchange::shared_ptr exchange;
- try {
- exchange = getBroker().getExchanges().get(exchangeName);
- } catch (const ChannelException&) {}
-
- Queue::shared_ptr queue;
- if (!queueName.empty()) {
- queue = getBroker().getQueues().find(queueName);
- }
-
- if (!exchange) {
- return BindingXQueryResult(true, false, false, false, false);
- } else if (!queueName.empty() && !queue) {
- return BindingXQueryResult(false, true, false, false, false);
- } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return BindingXQueryResult(false, false, false, false, false);
- } else {
- //need to test each specified option individually
- bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
- bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
- bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
-
- return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
- }
-}
-
-QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
-{
- Queue::shared_ptr queue = state.getQueue(name);
- Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
-
- return QueueXQueryResult(queue->getName(),
- alternateExchange ? alternateExchange->getName() : "",
- queue->isDurable(),
- queue->hasExclusiveOwner(),
- queue->isAutoDelete(),
- queue->getSettings(),
- queue->getMessageCount(),
- queue->getConsumerCount());
-}
-
-void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, const qpid::framing::FieldTable& arguments){
-
- Exchange::shared_ptr alternate;
- if (!alternateExchange.empty()) {
- alternate = getBroker().getExchanges().get(alternateExchange);
- }
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = state.getQueue(name);
- //TODO: check alternate-exchange is as expected
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().getQueues().declare(
- name, durable,
- autoDelete,
- exclusive ? &getConnection() : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- if (alternate) {
- queue->setAlternateExchange(alternate);
- alternate->incAlternateUsers();
- }
-
- //apply settings & create persistent record if required
- queue_created.first->create(arguments);
-
- //add default binding:
- getBroker().getExchanges().getDefault()->bind(queue, name, 0);
- queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
- //handle automatic cleanup:
- if (exclusive) {
- getConnection().exclusiveQueues.push_back(queue);
- }
- } else {
- if (exclusive && queue->setExclusiveOwner(&getConnection())) {
- getConnection().exclusiveQueues.push_back(queue);
- }
- }
- }
- if (exclusive && !queue->isExclusiveOwner(&getConnection()))
- throw ResourceLockedException(
- QPID_MSG("Cannot grant exclusive access to queue "
- << queue->getName()));
-}
-
-void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey,
- const FieldTable& arguments){
-
- Queue::shared_ptr queue = state.getQueue(queueName);
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if(exchange){
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
- queue->bound(exchangeName, routingKey, arguments);
- if (exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
- }
- }
- }else{
- throw NotFoundException(
- "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void
-BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
- const string& queueName,
- const string& exchangeName,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments);
- }
-
-}
-
-void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- state.getQueue(queue)->purge();
-}
-
-void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
- ChannelException error(0, "");
- Queue::shared_ptr q = state.getQueue(queue);
- if(ifEmpty && q->getMessageCount() > 0){
- throw PreconditionFailedException("Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw PreconditionFailedException("Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&getConnection())){
- QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
- if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
- }
- q->destroy();
- getBroker().getQueues().destroy(queue);
- q->unbind(getBroker().getExchanges(), q);
- }
-}
-
-
-
-
-void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- state.setPrefetchSize(prefetchSize);
- state.setPrefetchCount(prefetchCount);
-}
-
-void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
-{
-
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!consumerTag.empty() && state.exists(consumerTag)){
- throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
- }
- string newTag = consumerTag;
- //need to generate name here, so we have it for the adapter (it is
- //also version specific behaviour now)
- if (newTag.empty()) newTag = tagGenerator.generate();
- DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
-
- if(!nowait)
- getProxy().getBasic().consumeOk(newTag);
-}
-
-void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- state.cancel(consumerTag);
-}
-
-void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = state.getQueue(queueName);
- DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!state.get(token, queue, !noAck)){
- string clusterId;//not used, part of an imatix hack
-
- getProxy().getBasic().getEmpty(clusterId);
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
- if (multiple) {
- state.ackCumulative(deliveryTag);
- } else {
- state.ackRange(deliveryTag, deliveryTag);
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
-{
- state.recover(requeue);
-}
-
-void BrokerAdapter::TxHandlerImpl::select()
-{
- state.startTx();
-}
-
-void BrokerAdapter::TxHandlerImpl::commit()
-{
- state.commit(&getBroker().getStore(), true);
-}
-
-void BrokerAdapter::TxHandlerImpl::rollback()
-{
- state.rollback();
- state.recover(true);
-}
-
-}} // namespace qpid::broker
-
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
deleted file mode 100644
index 26dfe802e1..0000000000
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ /dev/null
@@ -1,208 +0,0 @@
-#ifndef _broker_BrokerAdapter_h
-#define _broker_BrokerAdapter_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "DtxHandlerImpl.h"
-#include "MessageHandlerImpl.h"
-
-#include "qpid/Exception.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace broker {
-
-class Channel;
-class Connection;
-class Broker;
-class ConnectionHandler;
-class BasicHandler;
-class ExchangeHandler;
-class QueueHandler;
-class TxHandler;
-class MessageHandler;
-class AccessHandler;
-class FileHandler;
-class StreamHandler;
-class DtxHandler;
-class TunnelHandler;
-class MessageHandlerImpl;
-class Exchange;
-
-/**
- * Per-channel protocol adapter.
- *
- * A container for a collection of AMQP-class adapters that translate
- * AMQP method bodies into calls on the core Broker objects. Each
- * adapter class also provides a client proxy to send methods to the
- * peer.
- *
- */
-class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
-{
- public:
- BrokerAdapter(SemanticState& session);
-
- BasicHandler* getBasicHandler() { return &basicHandler; }
- ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
- BindingHandler* getBindingHandler() { return &bindingHandler; }
- QueueHandler* getQueueHandler() { return &queueHandler; }
- TxHandler* getTxHandler() { return &txHandler; }
- MessageHandler* getMessageHandler() { return &messageHandler; }
- DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
- DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
-
- framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
-
-
- AccessHandler* getAccessHandler() {
- throw framing::NotImplementedException("Access class not implemented"); }
- FileHandler* getFileHandler() {
- throw framing::NotImplementedException("File class not implemented"); }
- StreamHandler* getStreamHandler() {
- throw framing::NotImplementedException("Stream class not implemented"); }
- TunnelHandler* getTunnelHandler() {
- throw framing::NotImplementedException("Tunnel class not implemented"); }
-
- Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
-
- // Handlers no longer implemented in BrokerAdapter:
-#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
- ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
- ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
- SessionHandler* getSessionHandler() { BADHANDLER(); }
- Connection010Handler* getConnection010Handler() { BADHANDLER(); }
- Session010Handler* getSession010Handler() { BADHANDLER(); }
-#undef BADHANDLER
-
- private:
- class ExchangeHandlerImpl :
- public ExchangeHandler,
- public HandlerImpl
- {
- public:
- ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- void declare(uint16_t ticket,
- const std::string& exchange, const std::string& type,
- const std::string& alternateExchange,
- bool passive, bool durable, bool autoDelete,
- const qpid::framing::FieldTable& arguments);
- void delete_(uint16_t ticket,
- const std::string& exchange, bool ifUnused);
- framing::ExchangeXQueryResult query(u_int16_t ticket,
- const std::string& name);
- private:
- void checkType(shared_ptr<Exchange> exchange, const std::string& type);
-
- void checkAlternate(shared_ptr<Exchange> exchange,
- shared_ptr<Exchange> alternate);
- };
-
- class BindingHandlerImpl :
- public BindingHandler,
- public HandlerImpl
- {
- public:
- BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- framing::BindingXQueryResult query(u_int16_t ticket,
- const std::string& exchange,
- const std::string& queue,
- const std::string& routingKey,
- const framing::FieldTable& arguments);
- };
-
- class QueueHandlerImpl :
- public QueueHandler,
- public HandlerImpl
- {
- public:
- QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- void declare(uint16_t ticket, const std::string& queue,
- const std::string& alternateExchange,
- bool passive, bool durable, bool exclusive,
- bool autoDelete,
- const qpid::framing::FieldTable& arguments);
- void bind(uint16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- const qpid::framing::FieldTable& arguments);
- void unbind(uint16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- framing::QueueXQueryResult query(const std::string& queue);
- void purge(uint16_t ticket, const std::string& queue);
- void delete_(uint16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty);
- };
-
- class BasicHandlerImpl :
- public BasicHandler,
- public HandlerImpl
- {
- NameGenerator tagGenerator;
- public:
- BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {}
-
- void qos(uint32_t prefetchSize,
- uint16_t prefetchCount, bool global);
- void consume(uint16_t ticket, const std::string& queue,
- const std::string& consumerTag,
- bool noLocal, bool noAck, bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const std::string& consumerTag);
- void get(uint16_t ticket, const std::string& queue, bool noAck);
- void ack(uint64_t deliveryTag, bool multiple);
- void reject(uint64_t deliveryTag, bool requeue);
- void recover(bool requeue);
- };
-
- class TxHandlerImpl :
- public TxHandler,
- public HandlerImpl
- {
- public:
- TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- void select();
- void commit();
- void rollback();
- };
-
- BasicHandlerImpl basicHandler;
- ExchangeHandlerImpl exchangeHandler;
- BindingHandlerImpl bindingHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
- DtxHandlerImpl dtxHandler;
-};
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_BrokerAdapter_h*/
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index fca381063e..1994c4fdf5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -20,9 +20,7 @@
*/
#include "Connection.h"
#include "SessionState.h"
-#include "BrokerAdapter.h"
#include "Bridge.h"
-#include "SemanticHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -143,7 +141,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
- ptr_map_ptr(i)->localSuspend();
+ ptr_map_ptr(i)->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 7e20408388..5de5a0230a 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -21,7 +21,6 @@
#include "ConnectionFactory.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
-#include "PreviewConnectionCodec.h"
namespace qpid {
namespace broker {
@@ -34,8 +33,6 @@ ConnectionFactory::~ConnectionFactory() {}
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == ProtocolVersion(99, 0))
- return new PreviewConnectionCodec(out, broker, id);
if (v == ProtocolVersion(0, 10))
return new amqp_0_10::Connection(out, broker, id);
return 0;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index c7738cc4ea..4ed2f5bfa2 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -50,9 +50,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
try{
bool handled = false;
if (handler->serverMode) {
- handled = invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method);
+ handled = invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method);
} else {
- handled = invoke(static_cast<AMQP_ClientOperations::Connection010Handler&>(*handler.get()), *method);
+ handled = invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method);
}
if (!handled) {
handler->connection.getChannel(frame.getChannel()).in(frame);
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index ea8b84b07c..a04936a943 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -41,11 +41,11 @@ class Connection;
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
- public framing::AMQP_ClientOperations::Connection010Handler
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
{
- framing::AMQP_ClientProxy::Connection010 client;
- framing::AMQP_ServerProxy::Connection010 server;
+ framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
Connection& connection;
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
deleted file mode 100644
index 61ab856fa9..0000000000
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "DtxHandlerImpl.h"
-
-#include <boost/format.hpp>
-#include "Broker.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/Array.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using std::string;
-
-DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
-
-// DtxDemarcationHandler:
-
-
-void DtxHandlerImpl::select()
-{
- state.selectDtx();
-}
-
-DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
- const string& xid,
- bool fail,
- bool suspend)
-{
- try {
- if (fail) {
- state.endDtx(xid, true);
- if (suspend) {
- throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
- } else {
- return DtxDemarcationXEndResult(XA_RBROLLBACK);
- }
- } else {
- if (suspend) {
- state.suspendDtx(xid);
- } else {
- state.endDtx(xid, false);
- }
- return DtxDemarcationXEndResult(XA_OK);
- }
- } catch (const DtxTimeoutException& e) {
- return DtxDemarcationXEndResult(XA_RBTIMEOUT);
- }
-}
-
-DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
- const string& xid,
- bool join,
- bool resume)
-{
- if (join && resume) {
- throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
- }
- try {
- if (resume) {
- state.resumeDtx(xid);
- } else {
- state.startDtx(xid, getBroker().getDtxManager(), join);
- }
- return DtxDemarcationXStartResult(XA_OK);
- } catch (const DtxTimeoutException& e) {
- return DtxDemarcationXStartResult(XA_RBTIMEOUT);
- }
-}
-
-// DtxCoordinationHandler:
-
-DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
- const string& xid)
-{
- try {
- bool ok = getBroker().getDtxManager().prepare(xid);
- return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
- } catch (const DtxTimeoutException& e) {
- return DtxCoordinationXPrepareResult(XA_RBTIMEOUT);
- }
-}
-
-DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
- const string& xid,
- bool onePhase)
-{
- try {
- bool ok = getBroker().getDtxManager().commit(xid, onePhase);
- return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK);
- } catch (const DtxTimeoutException& e) {
- return DtxCoordinationXCommitResult(XA_RBTIMEOUT);
- }
-}
-
-
-DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
- const string& xid )
-{
- try {
- getBroker().getDtxManager().rollback(xid);
- return DtxCoordinationXRollbackResult(XA_OK);
- } catch (const DtxTimeoutException& e) {
- return DtxCoordinationXRollbackResult(XA_RBTIMEOUT);
- }
-}
-
-DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
- bool /*startscan*/,
- bool /*endscan*/ )
-{
- //TODO: what do startscan and endscan actually mean?
-
- // response should hold on key value pair with key = 'xids' and
- // value = sequence of xids
-
- // until sequences are supported (0-10 encoding), an alternate
- // scheme is used for testing:
- //
- // key = 'xids' and value = a longstr containing shortstrs for each xid
- //
- // note that this restricts the length of the xids more than is
- // strictly 'legal', but that is ok for testing
- std::set<std::string> xids;
- getBroker().getStore().collectPreparedXids(xids);
-
- //TODO: remove the need to copy from one container type to another
- std::vector<std::string> data;
- for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
- data.push_back(*i);
- }
- Array indoubt(data);
- return DtxCoordinationXRecoverResult(indoubt);
-}
-
-void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
- const string& xid)
-{
- //Currently no heuristic completion is supported, so this should never be used.
- throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
-}
-
-DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
-{
- uint32_t timeout = getBroker().getDtxManager().getTimeout(xid);
- return DtxCoordinationXGetTimeoutResult(timeout);
-}
-
-
-void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
- const string& xid,
- u_int32_t timeout)
-{
- getBroker().getDtxManager().setTimeout(xid, timeout);
-}
-
-
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
deleted file mode 100644
index efb56dba95..0000000000
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ /dev/null
@@ -1,67 +0,0 @@
-#ifndef _broker_DtxHandlerImpl_h
-#define _broker_DtxHandlerImpl_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "HandlerImpl.h"
-
-namespace qpid {
-namespace broker {
-
-class DtxHandlerImpl
- : public HandlerImpl,
- public framing::AMQP_ServerOperations::DtxCoordinationHandler,
- public framing::AMQP_ServerOperations::DtxDemarcationHandler
-{
-public:
- DtxHandlerImpl(SemanticState&);
-
- // DtxCoordinationHandler:
-
- framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
-
- void forget(u_int16_t ticket, const std::string& xid);
-
- framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid);
-
- framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid);
-
- framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
-
- framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid);
-
- void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
-
- // DtxDemarcationHandler:
-
- framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
-
- void select();
-
- framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
-};
-
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_DtxHandlerImpl_h*/
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 297e610418..dd013843f9 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -24,7 +24,6 @@
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
@@ -36,7 +35,6 @@ using namespace qpid::framing;
using std::string;
TransferAdapter Message::TRANSFER;
-PreviewAdapter Message::TRANSFER_99_0;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
@@ -225,9 +223,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
MessageAdapter& Message::getAdapter() const
{
if (!adapter) {
- if(frames.isA<MessageXTransferBody>()) {
- adapter = &TRANSFER_99_0;
- } else if(frames.isA<MessageTransferBody>()) {
+ if(frames.isA<MessageTransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 4fd2f1401d..87c7a9c43e 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -136,7 +136,6 @@ public:
mutable MessageAdapter* adapter;
static TransferAdapter TRANSFER;
- static PreviewAdapter TRANSFER_99_0;
MessageAdapter& getAdapter() const;
};
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index 013e2c91ac..12f01494de 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -24,7 +24,6 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
namespace {
const std::string empty;
@@ -68,27 +67,4 @@ namespace broker{
return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
}
- std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::MessageXTransferBody>()->getDestination();
- }
-
- std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f)
- {
- const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
- return p ? p->getRoutingKey() : empty;
- }
-
- const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f)
- {
- const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>();
- return p ? &(p->getApplicationHeaders()) : 0;
- }
-
- bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
- {
- const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
- return p && p->getDeliveryMode() == 2;
- }
-
}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 4c13e756e9..61a1bc4794 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -52,14 +52,6 @@ struct TransferAdapter : MessageAdapter
bool requiresAccept(const framing::FrameSet& f);
};
-struct PreviewAdapter : TransferAdapter
-{
- std::string getExchange(const framing::FrameSet& f);
- std::string getRoutingKey(const framing::FrameSet& f);
- const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
- bool isPersistent(const framing::FrameSet& f);
-};
-
}}
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 36862edf37..f2a55e2790 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -24,10 +24,7 @@
#include "Message.h"
#include "Queue.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/BasicXDeliverBody.h"
-#include "qpid/framing/BasicXGetOkBody.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
using namespace boost;
@@ -43,41 +40,6 @@ struct BaseToken : DeliveryToken
virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0;
};
-struct BasicGetToken : BaseToken
-{
- typedef boost::shared_ptr<BasicGetToken> shared_ptr;
-
- Queue::shared_ptr queue;
-
- BasicGetToken(Queue::shared_ptr q) : queue(q) {}
-
- AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
- {
- return AMQFrame(in_place<BasicXGetOkBody>(
- ProtocolVersion(), id.getValue(),
- msg->getRedelivered(), msg->getExchangeName(),
- msg->getRoutingKey(), queue->getMessageCount()));
- }
-};
-
-struct BasicConsumeToken : BaseToken
-{
- typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
-
- const string consumer;
-
- BasicConsumeToken(const string c) : consumer(c) {}
-
- AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
- {
- return AMQFrame(in_place<BasicXDeliverBody>(
- ProtocolVersion(), consumer, id.getValue(),
- msg->getRedelivered(), msg->getExchangeName(),
- msg->getRoutingKey()));
- }
-
-};
-
struct MessageDeliveryToken : BaseToken
{
const std::string destination;
@@ -91,48 +53,23 @@ struct MessageDeliveryToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
{
//may need to set the redelivered flag:
- if (isPreview) {
- if (msg->getRedelivered()){
- msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true);
- }
- return AMQFrame(in_place<MessageXTransferBody>(
- ProtocolVersion(), 0, destination,
- confirmMode, acquireMode));
- } else {
- if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties>()->setRedelivered(true);
- }
- return AMQFrame(in_place<MessageTransferBody>(
- ProtocolVersion(), destination, confirmMode, acquireMode));
+ if (msg->getRedelivered()){
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
+ return AMQFrame(in_place<MessageTransferBody>(
+ ProtocolVersion(), destination, confirmMode, acquireMode));
}
};
}
}
-DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue)
-{
- return DeliveryToken::shared_ptr(new BasicGetToken(queue));
-}
-
-DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer)
-{
- return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
-}
-
DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode, u_int8_t acquireMode)
{
return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false));
}
-DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination,
- u_int8_t confirmMode, u_int8_t acquireMode)
-{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true));
-}
-
void MessageDelivery::deliver(QueuedMessage& msg,
framing::FrameHandler& handler,
DeliveryId id,
diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h
index 564e1456a0..6deafbf519 100644
--- a/cpp/src/qpid/broker/MessageDelivery.h
+++ b/cpp/src/qpid/broker/MessageDelivery.h
@@ -34,15 +34,12 @@ class Message;
class Queue;
/**
+ * TODO: clean this up; we don't need it anymore in its current form
+ *
* Encapsulates the different options for message delivery currently supported.
*/
class MessageDelivery {
public:
- static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
- static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
- static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination,
- u_int8_t confirmMode,
- u_int8_t acquireMode);
static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode,
u_int8_t acquireMode);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
deleted file mode 100644
index 5e0e759dfb..0000000000
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/Exception.h"
-#include "qpid/log/Statement.h"
-#include "MessageHandlerImpl.h"
-#include "qpid/framing/FramingContent.h"
-#include "Connection.h"
-#include "Broker.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "BrokerAdapter.h"
-
-#include <boost/format.hpp>
-#include <boost/cast.hpp>
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace broker {
-
-using namespace framing;
-
-MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
- HandlerImpl(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
- {}
-
-//
-// Message class method handlers
-//
-
-void
-MessageHandlerImpl::cancel(const string& destination )
-{
- state.cancel(destination);
-}
-
-void
-MessageHandlerImpl::open(const string& /*reference*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::close(const string& /*reference*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::checkpoint(const string& /*reference*/,
- const string& /*identifier*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::resume(const string& /*reference*/,
- const string& /*identifier*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::offset(uint64_t /*value*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::get(uint16_t /*ticket*/,
- const string& /*queueName*/,
- const string& /*destination*/,
- bool /*noAck*/ )
-{
- throw NotImplementedException("get no longer supported");
-}
-
-void
-MessageHandlerImpl::empty()
-{
- throw NotImplementedException("empty no longer supported");
-}
-
-void
-MessageHandlerImpl::ok()
-{
- throw NotImplementedException("Message.Ok no longer supported");
-}
-
-void
-MessageHandlerImpl::qos(uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool /*global*/ )
-{
- //TODO: handle global
- state.setPrefetchSize(prefetchSize);
- state.setPrefetchCount(prefetchCount);
-}
-
-void
-MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!destination.empty() && state.exists(destination))
- throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
-
- string tag = destination;
- state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
-}
-
-void
-MessageHandlerImpl::recover(bool requeue)
-{
- state.recover(requeue);
-}
-
-void
-MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
-{
- transfers.processRanges(rejectOp);
-}
-
-void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
-{
- if (unit == 0) {
- //message
- state.addMessageCredit(destination, value);
- } else if (unit == 1) {
- //bytes
- state.addByteCredit(destination, value);
- } else {
- //unknown
- throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
- }
-
-}
-
-void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
-{
- if (mode == 0) {
- //credit
- state.setCreditMode(destination);
- } else if (mode == 1) {
- //window
- state.setWindowMode(destination);
- } else{
- throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
- }
-}
-
-void MessageHandlerImpl::flush(const std::string& destination)
-{
- state.flush(destination);
-}
-
-void MessageHandlerImpl::stop(const std::string& destination)
-{
- state.stop(destination);
-}
-
-void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
-{
- //TODO: implement mode
-
- SequenceNumberSet results;
- RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
- transfers.processRanges(op);
- results = results.condense();
- getProxy().getMessage().acquired(results);
-}
-
-void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
-{
- transfers.processRanges(releaseOp);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
deleted file mode 100644
index dd70f35dbb..0000000000
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ /dev/null
@@ -1,110 +0,0 @@
-#ifndef _broker_MessageHandlerImpl_h
-#define _broker_MessageHandlerImpl_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <memory>
-
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "HandlerImpl.h"
-
-#include <boost/function.hpp>
-
-namespace qpid {
-namespace broker {
-
-class Connection;
-class Broker;
-class MessageMessage;
-
-class MessageHandlerImpl :
- public framing::AMQP_ServerOperations::MessageHandler,
- public HandlerImpl
-{
- typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
- RangedOperation releaseOp;
- RangedOperation rejectOp;
-
- public:
- MessageHandlerImpl(SemanticState&);
-
- void append(const std::string& reference, const std::string& bytes);
-
- void cancel(const std::string& destination );
-
- void checkpoint(const std::string& reference,
- const std::string& identifier );
-
- void close(const std::string& reference );
-
- void empty();
-
- void get(uint16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noAck );
-
- void offset(uint64_t value);
-
- void ok();
-
- void open(const std::string& reference );
-
- void qos(uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool global );
-
- void recover(bool requeue );
-
- void reject(const framing::SequenceNumberSet& transfers,
- uint16_t code,
- const std::string& text );
-
- void resume(const std::string& reference,
- const std::string& identifier );
-
- void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
-
- void flowMode(const std::string& destination, u_int8_t mode);
-
- void flush(const std::string& destination);
-
- void stop(const std::string& destination);
-
- void acquire(const framing::SequenceNumberSet& transfers, u_int8_t mode);
-
- void release(const framing::SequenceNumberSet& transfers);
-
- void subscribe(u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter);
-
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_MessageHandlerImpl_h*/
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
deleted file mode 100644
index 2643c85824..0000000000
--- a/cpp/src/qpid/broker/PreviewConnection.cpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PreviewConnection.h"
-#include "SessionState.h"
-#include "BrokerAdapter.h"
-#include "Bridge.h"
-#include "SemanticHandler.h"
-
-#include "qpid/log/Statement.h"
-#include "qpid/ptr_map.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/management/ManagementAgent.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-
-#include <algorithm>
-#include <iostream>
-#include <assert.h>
-
-using namespace boost;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using qpid::ptr_map_ptr;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper
-{
- management::Client::shared_ptr mgmtClient;
-
-public:
- MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtClient();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
-};
-
-class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(PreviewConnection& connection, const management::Args& args);
-};
-
-
-PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
- ConnectionState(out_, broker_),
- adapter(*this, isLink),
- mgmtClosing(false),
- mgmtId(mgmtId_)
-{
- Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
- if (isLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
- } else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
- }
- }
- }
-}
-
-PreviewConnection::~PreviewConnection () {
-}
-
-void PreviewConnection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
-
- if (frame.getChannel() == 0) {
- adapter.handle(frame);
- } else {
- getChannel(frame.getChannel()).in(frame);
- }
-
- if (mgmtWrapper.get()) mgmtWrapper->received(frame);
-}
-
-void PreviewConnection::close(
- ReplyCode code, const string& text, ClassId classId, MethodId methodId)
-{
- adapter.close(code, text, classId, methodId);
- channels.clear();
- getOutput().close();
-}
-
-void PreviewConnection::idleOut(){}
-
-void PreviewConnection::idleIn(){}
-
-void PreviewConnection::closed(){ // Physically closed, suspend open sessions.
- try {
- for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
- ptr_map_ptr(i)->localSuspend();
- while (!exclusiveQueues.empty()) {
- Queue::shared_ptr q(exclusiveQueues.front());
- q->releaseExclusiveOwnership();
- if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
- }
- exclusiveQueues.erase(exclusiveQueues.begin());
- }
- } catch(std::exception& e) {
- QPID_LOG(error, " Unhandled exception while closing session: " <<
- e.what());
- assert(0);
- }
-}
-
-bool PreviewConnection::doOutput()
-{
- try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
- if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
-
-
- //then do other output as needed:
- return outputTasks.doOutput();
- }catch(ConnectionException& e){
- close(e.code, e.what(), 0, 0);
- }catch(std::exception& e){
- close(541/*internal error*/, e.what(), 0, 0);
- }
- return false;
-}
-
-void PreviewConnection::closeChannel(uint16_t id) {
- ChannelMap::iterator i = channels.find(id);
- if (i != channels.end()) channels.erase(i);
-}
-
-PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) {
- ChannelMap::iterator i=channels.find(id);
- if (i == channels.end()) {
- i = channels.insert(id, new PreviewSessionHandler(*this, id)).first;
- }
- return *ptr_map_ptr(i);
-}
-
-ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const
-{
- return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
-}
-
-Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
- Args& args)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
-
- QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case management::Client::METHOD_CLOSE :
- mgmtClosing = true;
- if (mgmtWrapper.get()) mgmtWrapper->closing();
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
- }
-
- return status;
-}
-
-PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-PreviewConnection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void PreviewConnection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void PreviewConnection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void PreviewConnection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
-{
- mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
- agent->addObject (mgmtClient);
-}
-
-PreviewConnection::MgmtClient::~MgmtClient()
-{
- if (mgmtClient.get () != 0)
- mgmtClient->resourceDestroy ();
-}
-
-void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame)
-{
- if (mgmtClient.get () != 0)
- {
- mgmtClient->inc_framesFromClient ();
- mgmtClient->inc_bytesFromClient (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtClient);
-}
-
-void PreviewConnection::MgmtClient::closing()
-{
- if (mgmtClient) mgmtClient->set_closing (1);
-}
-
-}}
-
diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h
deleted file mode 100644
index 7a8404bf77..0000000000
--- a/cpp/src/qpid/broker/PreviewConnection.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _PreviewConnection_
-#define _PreviewConnection_
-
-#include <memory>
-#include <sstream>
-#include <vector>
-
-#include <boost/ptr_container/ptr_map.hpp>
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "Broker.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/Exception.h"
-#include "PreviewConnectionHandler.h"
-#include "ConnectionState.h"
-#include "PreviewSessionHandler.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/management/Client.h"
-#include "qpid/management/Link.h"
-
-#include <boost/ptr_container/ptr_map.hpp>
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState
-{
- public:
- PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
- ~PreviewConnection ();
-
- /** Get the PreviewSessionHandler for channel. Create if it does not already exist */
- PreviewSessionHandler& getChannel(framing::ChannelId channel);
-
- /** Close the connection */
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
-
- // ConnectionInputHandler methods
- void received(framing::AMQFrame& frame);
- void idleOut();
- void idleIn();
- void closed();
- bool doOutput();
-
- void closeChannel(framing::ChannelId channel);
-
- // Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject (void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args);
-
- private:
- typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap;
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
- /**
- * Connection may appear, for the purposes of management, as a
- * normal client initiated connection or as an agent initiated
- * inter-broker link. This wrapper abstracts the common interface
- * for both.
- */
- class MgmtWrapper
- {
- public:
- virtual ~MgmtWrapper(){}
- virtual void received(framing::AMQFrame& frame) = 0;
- virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
- virtual void closing() = 0;
- virtual void processPending(){}
- virtual void process(PreviewConnection&, const management::Args&){}
- };
- class MgmtClient;
- class MgmtLink;
-
- ChannelMap channels;
- framing::AMQP_ClientProxy::Connection* client;
- uint64_t stagingThreshold;
- PreviewConnectionHandler adapter;
- std::auto_ptr<MgmtWrapper> mgmtWrapper;
- bool mgmtClosing;
- const std::string mgmtId;
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
deleted file mode 100644
index b6c9b03776..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PreviewConnectionCodec.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace broker {
-
-using sys::Mutex;
-
-PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id, bool isClient)
- : frameQueueClosed(false), output(o), connection(this, broker, id, isClient), identifier(id) {}
-
-size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) {
- framing::Buffer in(const_cast<char*>(buffer), size);
- framing::AMQFrame frame;
- while(frame.decode(in)) {
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
- }
- return in.getPosition();
-}
-
-bool PreviewConnectionCodec::canEncode() {
- if (!frameQueueClosed && frameQueue.empty()) connection.doOutput();
- return !frameQueue.empty();
-}
-
-bool PreviewConnectionCodec::isClosed() const {
- Mutex::ScopedLock l(frameQueueLock);
- return frameQueueClosed;
-}
-
-size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) {
- Mutex::ScopedLock l(frameQueueLock);
- framing::Buffer out(const_cast<char*>(buffer), size);
- while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
- frameQueue.front().encode(out);
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop();
- if (!frameQueueClosed && frameQueue.empty()) connection.doOutput();
- }
- if (!frameQueue.empty() && frameQueue.front().size() > size)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
- return out.getPosition();
-}
-
-void PreviewConnectionCodec::activateOutput() { output.activateOutput(); }
-
-void PreviewConnectionCodec::close() {
- // Close the output queue.
- Mutex::ScopedLock l(frameQueueLock);
- frameQueueClosed = true;
-}
-
-void PreviewConnectionCodec::closed() {
- connection.closed();
-}
-
-void PreviewConnectionCodec::send(framing::AMQFrame& f) {
- {
- Mutex::ScopedLock l(frameQueueLock);
- if (!frameQueueClosed)
- frameQueue.push(f);
- }
- activateOutput();
-}
-
-framing::ProtocolVersion PreviewConnectionCodec::getVersion() const {
- return framing::ProtocolVersion(99,0);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h
deleted file mode 100644
index f2ab086d06..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionCodec.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H
-#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/Mutex.h"
-#include "PreviewConnection.h"
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler {
- std::queue<framing::AMQFrame> frameQueue;
- bool frameQueueClosed;
- mutable sys::Mutex frameQueueLock;
- sys::OutputControl& output;
- PreviewConnection connection;
- std::string identifier;
-
- public:
- PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id, bool isClient = false);
- size_t decode(const char* buffer, size_t size);
- size_t encode(const char* buffer, size_t size);
- bool isClosed() const;
- bool canEncode();
- void activateOutput();
- void closed(); // connection closed by peer.
- void close(); // closing from this end.
- void send(framing::AMQFrame&);
- framing::ProtocolVersion getVersion() const;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
deleted file mode 100644
index 3477b59cb5..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
+++ /dev/null
@@ -1,315 +0,0 @@
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "config.h"
-
-#include "PreviewConnectionHandler.h"
-#include "PreviewConnection.h"
-#include "qpid/framing/ConnectionStartBody.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/log/Statement.h"
-
-#if HAVE_SASL
-#include <sasl/sasl.h>
-#endif
-
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-namespace
-{
-const std::string PLAIN = "PLAIN";
-const std::string en_US = "en_US";
-}
-
-void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
-{
- handler->client.close(code, text, classId, methodId);
-}
-
-void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
-{
- AMQMethodBody* method=frame.getBody()->getMethod();
- try{
- if (handler->serverMode) {
- if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
- throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
- } else {
- if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
- throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
- }
- }catch(ConnectionException& e){
- handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection, bool isClient) : handler(new Handler(connection)) {
- FieldTable properties;
- string mechanisms;
- string locales(en_US);
- if (isClient) {
- handler->serverMode = false;
- }else {
-#if HAVE_SASL
- if (connection.getBroker().getOptions().auth) {
- const char *list;
- unsigned int list_len;
- int count;
- int code = sasl_listmech(handler->sasl_conn, NULL,
- "", " ", "",
- &list, &list_len,
- &count);
-
- if (SASL_OK != code) {
- QPID_LOG(info, "SASL: Mechanism listing failed: "
- << sasl_errdetail(handler->sasl_conn));
-
- // TODO: Change this to an exception signaling
- // server error, when one is available
- throw CommandInvalidException("Mechanism listing failed");
- } else {
- // TODO: For 0-10 the mechanisms must be returned
- // in a list instead of space separated
- mechanisms = list;
- }
- } else {
-#endif
- // TODO: It would be more proper for this to be ANONYMOUS
- mechanisms = PLAIN;
-#if HAVE_SASL
- }
-#endif
-
- QPID_LOG(info, "SASL: Sending mechanism list: " << mechanisms);
-
- handler->serverMode = true;
- handler->client.start(99, 0, properties, mechanisms, locales);
- }
-}
-
-PreviewConnectionHandler::Handler::Handler(PreviewConnection& c) :
-#if HAVE_SASL
- sasl_conn(NULL),
-#endif
- client(c.getOutput()), server(c.getOutput()),
- connection(c), serverMode(false)
-{
-#if HAVE_SASL
- if (connection.getBroker().getOptions().auth) {
- int code = sasl_server_new(BROKER_SASL_NAME,
- NULL, NULL, NULL, NULL, NULL, 0,
- &sasl_conn);
-
- if (SASL_OK != code) {
- QPID_LOG(info, "SASL: Connection creation failed: "
- << sasl_errdetail(sasl_conn));
-
- // TODO: Change this to an exception signaling
- // server error, when one is available
- throw CommandInvalidException("Unable to perform authentication");
- }
- }
-#endif
-}
-
-PreviewConnectionHandler::Handler::~Handler()
-{
-#if HAVE_SASL
- if (NULL != sasl_conn) {
- sasl_dispose(&sasl_conn);
- sasl_conn = NULL;
- }
-#endif
-}
-
-#if HAVE_SASL
-void PreviewConnectionHandler::Handler::processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len)
-{
- if (SASL_OK == code) {
- const void *uid;
-
- code = sasl_getprop(sasl_conn,
- SASL_USERNAME,
- &uid);
- if (SASL_OK != code) {
- QPID_LOG(info, "SASL: Authentication succeeded, username unavailable");
- // TODO: Change this to an exception signaling
- // authentication failure, when one is available
- throw ConnectionForcedException("Authenticated username unavailable");
- }
-
- QPID_LOG(info, "SASL: Authentication succeeded for: " << (char *)uid);
-
- connection.setUserId((char *)uid);
-
- client.tune(framing::CHANNEL_MAX,
- connection.getFrameMax(),
- connection.getHeartbeat());
- } else if (SASL_CONTINUE == code) {
- string challenge_str(challenge, challenge_len);
-
- QPID_LOG(debug, "SASL: sending challenge to client");
-
- client.secure(challenge_str);
- } else {
- QPID_LOG(info, "SASL: Authentication failed: "
- << sasl_errdetail(sasl_conn));
-
- // TODO: Change to more specific exceptions, when they are
- // available
- switch (code) {
- case SASL_NOMECH:
- throw ConnectionForcedException("Unsupported mechanism");
- break;
- case SASL_TRYAGAIN:
- throw ConnectionForcedException("Transient failure, try again");
- break;
- default:
- throw ConnectionForcedException("Authentication failed");
- break;
- }
- }
-}
-#endif
-
-void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
-#if HAVE_SASL
- const string& mechanism,
- const string& response,
-#else
- const string& /*mechanism*/,
- const string& /*response*/,
-#endif
- const string& /*locale*/)
-{
-#if HAVE_SASL
- if (connection.getBroker().getOptions().auth) {
- const char *challenge;
- unsigned int challenge_len;
-
- QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
- int code = sasl_server_start(sasl_conn,
- mechanism.c_str(),
- response.c_str(), response.length(),
- &challenge, &challenge_len);
-
- processAuthenticationStep(code, challenge, challenge_len);
- } else {
-#endif
- QPID_LOG(warning, "SASL: No Authentication Performed");
-
- // TODO: Figure out what should actually be set in this case
- connection.setUserId("anonymous");
-
- client.tune(framing::CHANNEL_MAX,
- connection.getFrameMax(),
- connection.getHeartbeat());
-#if HAVE_SASL
- }
-#endif
-}
-
-void PreviewConnectionHandler::Handler::secureOk(const string&
-#if HAVE_SASL
- response
-#endif
- ) {
-#if HAVE_SASL
- int code;
- const char *challenge;
- unsigned int challenge_len;
-
- code = sasl_server_step(sasl_conn,
- response.c_str(), response.length(),
- &challenge, &challenge_len);
-
- processAuthenticationStep(code, challenge, challenge_len);
-#endif
-}
-
-void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
- uint32_t framemax, uint16_t heartbeat)
-{
- connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
-}
-
-void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/,
- const string& /*capabilities*/, bool /*insist*/)
-{
- string knownhosts;
- client.openOk(knownhosts);
-}
-
-
-void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk();
- connection.getOutput().close();
-}
-
-void PreviewConnectionHandler::Handler::closeOk(){
- connection.getOutput().close();
-}
-
-
-void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
- uint8_t /*versionMinor*/,
- const FieldTable& /*serverProperties*/,
- const string& /*mechanisms*/,
- const string& /*locales*/)
-{
- string uid = "qpidd";
- string pwd = "qpidd";
- string response = ((char)0) + uid + ((char)0) + pwd;
- server.startOk(FieldTable(), PLAIN, response, en_US);
-}
-
-void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/)
-{
- server.secureOk("");
-}
-
-void PreviewConnectionHandler::Handler::tune(uint16_t channelMax,
- uint32_t frameMax,
- uint16_t heartbeat)
-{
- connection.setFrameMax(frameMax);
- connection.setHeartbeat(heartbeat);
- server.tuneOk(channelMax, frameMax, heartbeat);
- server.open("/", "", true);
-}
-
-void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
-{
-}
-
-void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/)
-{
-
-}
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h
deleted file mode 100644
index b71068d81d..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.h
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _PreviewConnectionAdapter_
-#define _PreviewConnectionAdapter_
-
-#include "config.h"
-
-#include <memory>
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ServerProxy.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/Exception.h"
-
-#if HAVE_SASL
-#include <sasl/sasl.h>
-#endif
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection;
-
-// TODO aconway 2007-09-18: Rename to ConnectionHandler
-class PreviewConnectionHandler : public framing::FrameHandler
-{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
- public framing::AMQP_ClientOperations::ConnectionHandler
- {
-#if HAVE_SASL
- sasl_conn_t *sasl_conn;
-#endif
- framing::AMQP_ClientProxy::Connection client;
- framing::AMQP_ServerProxy::Connection server;
- PreviewConnection& connection;
- bool serverMode;
-
- Handler(PreviewConnection& connection);
- ~Handler();
- void startOk(const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const std::string& response);
- void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
- void open(const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(uint16_t replyCode, const std::string& replyText,
- uint16_t classId, uint16_t methodId);
- void closeOk();
-
-
- void start(uint8_t versionMajor,
- uint8_t versionMinor,
- const qpid::framing::FieldTable& serverProperties,
- const std::string& mechanisms,
- const std::string& locales);
-
- void secure(const std::string& challenge);
-
- void tune(uint16_t channelMax,
- uint32_t frameMax,
- uint16_t heartbeat);
-
- void openOk(const std::string& knownHosts);
-
- void redirect(const std::string& host, const std::string& knownHosts);
- private:
-#if HAVE_SASL
- void processAuthenticationStep(int code,
- const char *challenge,
- unsigned int challenge_len);
-#endif
- };
- std::auto_ptr<Handler> handler;
- public:
- PreviewConnectionHandler(PreviewConnection& connection, bool isClient);
- void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
- void handle(framing::AMQFrame& frame);
-};
-
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/cpp/src/qpid/broker/PreviewSessionHandler.cpp
deleted file mode 100644
index 36092bb7f6..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionHandler.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "PreviewSessionHandler.h"
-#include "PreviewSessionState.h"
-#include "PreviewConnection.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace broker {
-using namespace framing;
-using namespace std;
-using namespace qpid::sys;
-
-PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
- : InOutHandler(0, &out),
- connection(c), channel(ch, &c.getOutput()),
- proxy(out), // Via my own handleOut() for L2 data.
- peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false) {}
-
-PreviewSessionHandler::~PreviewSessionHandler() {}
-
-namespace {
-ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
-MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
-} // namespace
-
-void PreviewSessionHandler::handleIn(AMQFrame& f) {
- // Note on channel states: a channel is open if session != 0. A
- // channel that is closed (session == 0) can be in the "ignoring"
- // state. This is a temporary state after we have sent a channel
- // exception, where extra frames might arrive that should be
- // ignored.
- //
- AMQMethodBody* m = f.getBody()->getMethod();
- try {
- if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
- session->in.handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (!ignoring) {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
- }
- } catch(const ChannelException& e) {
- ignoring=true; // Ignore trailing frames sent by client.
- session->detach();
- session.reset();
- peerSession.closed(e.code, e.what());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.what(), classId(m), methodId(m));
- }catch(const std::exception& e){
- connection.close(
- framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
- }
-}
-
-void PreviewSessionHandler::handleOut(AMQFrame& f) {
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.solicitAck();
-}
-
-void PreviewSessionHandler::assertAttached(const char* method) const {
- if (!session.get())
- throw ChannelErrorException(
- QPID_MSG(method << " failed: No session for channel "
- << getChannel()));
-}
-
-void PreviewSessionHandler::assertClosed(const char* method) const {
- if (session.get())
- throw ChannelBusyException(
- QPID_MSG(method << " failed: channel " << channel.get()
- << " is already open."));
-}
-
-void PreviewSessionHandler::open(uint32_t detachedLifetime) {
- assertClosed("open");
- std::auto_ptr<PreviewSessionState> state(
- connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
- peerSession.attached(session->getId(), session->getTimeout());
-}
-
-void PreviewSessionHandler::resume(const Uuid& id) {
- assertClosed("resume");
- session = connection.broker.getPreviewSessionManager().resume(id);
- session->attach(*this);
- SequenceNumber seq = session->resuming();
- peerSession.attached(session->getId(), session->getTimeout());
- proxy.getSession().ack(seq, SequenceNumberSet());
-}
-
-void PreviewSessionHandler::flow(bool /*active*/) {
- assertAttached("flow");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flow");
-}
-
-void PreviewSessionHandler::flowOk(bool /*active*/) {
- assertAttached("flowOk");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flowOk");
-}
-
-void PreviewSessionHandler::close() {
- assertAttached("close");
- QPID_LOG(info, "Received session.close");
- ignoring=false;
- session->detach();
- session.reset();
- peerSession.closed(REPLY_SUCCESS, "ok");
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) {
- QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
- ignoring=false;
- session->detach();
- session.reset();
-}
-
-void PreviewSessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getPreviewSessionManager().suspend(session);
- session.reset();
- }
-}
-
-void PreviewSessionHandler::suspend() {
- assertAttached("suspend");
- localSuspend();
- peerSession.detached();
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark,
- const SequenceNumberSet& /*seenFrameSet*/)
-{
- assertAttached("ack");
- if (session->getState() == PreviewSessionState::RESUMING) {
- session->receivedAck(cumulativeSeenMark);
- framing::SessionState::Replay replay=session->replay();
- std::for_each(replay.begin(), replay.end(),
- boost::bind(&PreviewSessionHandler::handleOut, this, _1));
- }
- else
- session->receivedAck(cumulativeSeenMark);
-}
-
-void PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.high-water-mark");
-}
-
-void PreviewSessionHandler::solicitAck() {
- assertAttached("solicit-ack");
- peerSession.ack(session->sendingAck(), SequenceNumberSet());
-}
-
-void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
-{
- std::auto_ptr<PreviewSessionState> state(
- connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
-}
-
-void PreviewSessionHandler::detached()
-{
- connection.broker.getPreviewSessionManager().suspend(session);
- session.reset();
-}
-
-ConnectionState& PreviewSessionHandler::getConnection() { return connection; }
-const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; }
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.h b/cpp/src/qpid/broker/PreviewSessionHandler.h
deleted file mode 100644
index 4c517367d7..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionHandler.h
+++ /dev/null
@@ -1,111 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H
-#define QPID_BROKER_PREVIEWSESSIONHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/ChannelHandler.h"
-#include "SessionContext.h"
-
-#include <boost/noncopyable.hpp>
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection;
-class PreviewSessionState;
-
-/**
- * A SessionHandler is associated with each active channel. It
- * receives incoming frames, handles session commands and manages the
- * association between the channel and a session.
- */
-class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::AMQP_ClientOperations::SessionHandler,
- public framing::FrameHandler::InOutHandler,
- private boost::noncopyable
-{
- public:
- PreviewSessionHandler(PreviewConnection&, framing::ChannelId);
- ~PreviewSessionHandler();
-
- /** Returns 0 if not attached to a session */
- PreviewSessionState* getSession() { return session.get(); }
- const PreviewSessionState* getSession() const { return session.get(); }
-
- framing::ChannelId getChannel() const { return channel.get(); }
-
- ConnectionState& getConnection();
- const ConnectionState& getConnection() const;
-
- framing::AMQP_ClientProxy& getProxy() { return proxy; }
- const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
-
- // Called by closing connection.
- void localSuspend();
- void detach() { localSuspend(); }
-
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
-
- private:
- /// Session methods
- void open(uint32_t detachedLifetime);
- void flow(bool active);
- void flowOk(bool active);
- void close();
- void closed(uint16_t replyCode, const std::string& replyText);
- void resume(const framing::Uuid& sessionId);
- void suspend();
- void ack(uint32_t cumulativeSeenMark,
- const framing::SequenceNumberSet& seenFrameSet);
- void highWaterMark(uint32_t lastSentMark);
- void solicitAck();
-
- //extra methods required for assuming client role
- void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
- void detached();
-
-
- void assertAttached(const char* method) const;
- void assertActive(const char* method) const;
- void assertClosed(const char* method) const;
-
-
- PreviewConnection& connection;
- framing::ChannelHandler channel;
- framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
- bool ignoring;
- std::auto_ptr<PreviewSessionState> session;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!QPID_BROKER_SESSIONHANDLER_H*/
diff --git a/cpp/src/qpid/broker/PreviewSessionManager.cpp b/cpp/src/qpid/broker/PreviewSessionManager.cpp
deleted file mode 100644
index 97a7c87e34..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionManager.cpp
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "PreviewSessionManager.h"
-#include "PreviewSessionState.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include "qpid/log/Helpers.h"
-#include "qpid/memory.h"
-
-#include <boost/bind.hpp>
-#include <boost/range.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-#include <algorithm>
-#include <functional>
-#include <ostream>
-
-namespace qpid {
-namespace broker {
-
-using namespace sys;
-using namespace framing;
-
-PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {}
-
-PreviewSessionManager::~PreviewSessionManager() {}
-
-// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
-std::auto_ptr<PreviewSessionState> PreviewSessionManager::open(
- PreviewSessionHandler& h, uint32_t timeout_)
-{
- Mutex::ScopedLock l(lock);
- std::auto_ptr<PreviewSessionState> session(
- new PreviewSessionState(this, &h, timeout_, ack));
- active.insert(session->getId());
- for_each(observers.begin(), observers.end(),
- boost::bind(&Observer::opened, _1,boost::ref(*session)));
- return session;
-}
-
-void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) {
- Mutex::ScopedLock l(lock);
- active.erase(session->getId());
- session->suspend();
- session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
- if (session->mgmtObject.get() != 0)
- session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
- suspended.push_back(session.release()); // In expiry order
- eraseExpired();
-}
-
-std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- eraseExpired();
- if (active.find(id) != active.end())
- throw SessionBusyException(
- QPID_MSG("Session already active: " << id));
- Suspended::iterator i = std::find_if(
- suspended.begin(), suspended.end(),
- boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1))
- );
- if (i == suspended.end())
- throw InvalidArgumentException(
- QPID_MSG("No suspended session with id=" << id));
- active.insert(id);
- std::auto_ptr<PreviewSessionState> state(suspended.release(i).release());
- return state;
-}
-
-void PreviewSessionManager::erase(const framing::Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- active.erase(id);
-}
-
-void PreviewSessionManager::eraseExpired() {
- // Called with lock held.
- if (!suspended.empty()) {
- Suspended::iterator keep = std::lower_bound(
- suspended.begin(), suspended.end(), now(),
- boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2));
- if (suspended.begin() != keep) {
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
- }
- }
-}
-
-void PreviewSessionManager::add(const boost::intrusive_ptr<Observer>& o) {
- observers.push_back(o);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionManager.h b/cpp/src/qpid/broker/PreviewSessionManager.h
deleted file mode 100644
index 9bc6bc5bbc..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionManager.h
+++ /dev/null
@@ -1,101 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H
-#define QPID_BROKER_PREVIEWSESSIONMANAGER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/framing/Uuid.h>
-#include <qpid/sys/Time.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/RefCounted.h>
-
-#include <set>
-#include <vector>
-#include <memory>
-
-#include <boost/noncopyable.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-
-class PreviewSessionState;
-class PreviewSessionHandler;
-
-/**
- * Create and manage PreviewSessionState objects.
- */
-class PreviewSessionManager : private boost::noncopyable {
- public:
- /**
- * Observer notified of PreviewSessionManager events.
- */
- struct Observer : public RefCounted {
- virtual void opened(PreviewSessionState&) {}
- };
-
- PreviewSessionManager(uint32_t ack);
-
- ~PreviewSessionManager();
-
- /** Open a new active session, caller takes ownership */
- std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_);
-
- /** Suspend a session, start it's timeout counter.
- * The factory takes ownership.
- */
- void suspend(std::auto_ptr<PreviewSessionState> session);
-
- /** Resume a suspended session.
- *@throw Exception if timed out or non-existant.
- */
- std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&);
-
- /** Add an Observer. */
- void add(const boost::intrusive_ptr<Observer>&);
-
- private:
- typedef boost::ptr_vector<PreviewSessionState> Suspended;
- typedef std::set<framing::Uuid> Active;
- typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
-
- void erase(const framing::Uuid&);
- void eraseExpired();
-
- sys::Mutex lock;
- Suspended suspended;
- Active active;
- uint32_t ack;
- Observers observers;
-
- friend class PreviewSessionState; // removes deleted sessions from active set.
-};
-
-
-
-}} // namespace qpid::broker
-
-
-
-
-
-#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/
diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp
deleted file mode 100644
index 43c3b1509e..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionState.cpp
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PreviewSessionState.h"
-#include "PreviewSessionManager.h"
-#include "PreviewSessionHandler.h"
-#include "ConnectionState.h"
-#include "Broker.h"
-#include "SemanticHandler.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace broker {
-
-using namespace framing;
-using sys::Mutex;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-
-PreviewSessionState::PreviewSessionState(
- PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack)
- : framing::SessionState(ack, timeout_ > 0),
- factory(f), handler(h), id(true), timeout(timeout_),
- broker(h->getConnection().broker),
- version(h->getConnection().getVersion()),
- semanticHandler(new SemanticHandler(*this))
-{
- in.next = semanticHandler.get();
- out.next = &handler->out;
-
- getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
-
- Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
- mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, id.str ()));
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h->getChannel());
- mgmtObject->set_detachedLifespan (getTimeout());
- agent->addObject (mgmtObject);
- }
- }
-}
-
-PreviewSessionState::~PreviewSessionState() {
- // Remove ID from active session list.
- if (factory)
- factory->erase(getId());
- if (mgmtObject.get () != 0)
- mgmtObject->resourceDestroy ();
-}
-
-PreviewSessionHandler* PreviewSessionState::getHandler() {
- return handler;
-}
-
-AMQP_ClientProxy& PreviewSessionState::getProxy() {
- assert(isAttached());
- return getHandler()->getProxy();
-}
-
-ConnectionState& PreviewSessionState::getConnection() {
- assert(isAttached());
- return getHandler()->getConnection();
-}
-
-bool PreviewSessionState::isLocal(const ConnectionToken* t) const
-{
- return isAttached() && &(handler->getConnection()) == t;
-}
-
-void PreviewSessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
- Mutex::ScopedLock l(lock);
- handler = 0; out.next = 0;
- if (mgmtObject.get() != 0)
- {
- mgmtObject->set_attached (0);
- }
-}
-
-void PreviewSessionState::attach(PreviewSessionHandler& h) {
- {
- Mutex::ScopedLock l(lock);
- handler = &h;
- out.next = &handler->out;
- if (mgmtObject.get() != 0)
- {
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h.getChannel());
- }
- }
- h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
-}
-
-void PreviewSessionState::activateOutput()
-{
- Mutex::ScopedLock l(lock);
- if (isAttached()) {
- getConnection().outputTasks.activateOutput();
- }
-}
- //This class could be used as the callback for queue notifications
- //if not attached, it can simply ignore the callback, else pass it
- //on to the connection
-
-ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const
-{
- return dynamic_pointer_cast<ManagementObject> (mgmtObject);
-}
-
-Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId,
- Args& /*args*/)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
-
- switch (methodId)
- {
- case management::Session::METHOD_DETACH :
- if (handler != 0)
- {
- handler->detach();
- }
- status = Manageable::STATUS_OK;
- break;
-
- case management::Session::METHOD_CLOSE :
- /*
- if (handler != 0)
- {
- handler->getConnection().closeChannel(handler->getChannel());
- }
- status = Manageable::STATUS_OK;
- break;
- */
-
- case management::Session::METHOD_SOLICITACK :
- case management::Session::METHOD_RESETLIFESPAN :
- status = Manageable::STATUS_NOT_IMPLEMENTED;
- break;
- }
-
- return status;
-}
-
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h
deleted file mode 100644
index 1aecb12e72..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionState.h
+++ /dev/null
@@ -1,125 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWSESSION_H
-#define QPID_BROKER_PREVIEWSESSION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/Uuid.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SessionState.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Time.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/management/Session.h"
-#include "SessionContext.h"
-
-#include <boost/noncopyable.hpp>
-#include <boost/scoped_ptr.hpp>
-
-#include <set>
-#include <vector>
-#include <ostream>
-
-namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
-namespace broker {
-
-class SemanticHandler;
-class PreviewSessionHandler;
-class PreviewSessionManager;
-class Broker;
-class ConnectionState;
-
-/**
- * Broker-side session state includes sessions handler chains, which may
- * themselves have state.
- */
-class PreviewSessionState : public framing::SessionState,
- public SessionContext,
- public framing::FrameHandler::Chains,
- public management::Manageable
-{
- public:
- ~PreviewSessionState();
- bool isAttached() const { return handler; }
-
- void detach();
- void attach(PreviewSessionHandler& handler);
-
-
- PreviewSessionHandler* getHandler();
-
- /** @pre isAttached() */
- framing::AMQP_ClientProxy& getProxy();
-
- /** @pre isAttached() */
- ConnectionState& getConnection();
- bool isLocal(const ConnectionToken* t) const;
-
- uint32_t getTimeout() const { return timeout; }
- Broker& getBroker() { return broker; }
- framing::ProtocolVersion getVersion() const { return version; }
-
- /** OutputControl **/
- void activateOutput();
-
- // Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject (void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args);
-
- // Normally SessionManager creates sessions.
- PreviewSessionState(PreviewSessionManager*,
- PreviewSessionHandler* out,
- uint32_t timeout,
- uint32_t ackInterval);
-
-
- private:
- PreviewSessionManager* factory;
- PreviewSessionHandler* handler;
- framing::Uuid id;
- uint32_t timeout;
- sys::AbsTime expiry; // Used by SessionManager.
- Broker& broker;
- framing::ProtocolVersion version;
- sys::Mutex lock;
- boost::scoped_ptr<SemanticHandler> semanticHandler;
- management::Session::shared_ptr mgmtObject;
-
- friend class PreviewSessionManager;
-};
-
-
-inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) {
- return out << session.getId();
-}
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!QPID_BROKER_SESSION_H*/
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 628d969c69..e799cde2b9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -340,11 +340,11 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
void Queue::consume(Consumer& c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
- throw AccessRefusedException(
+ throw ResourceLockedException(
QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
} else if(requestExclusive) {
if(consumerCount) {
- throw AccessRefusedException(
+ throw ResourceLockedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
exclusive = c.getSession();
@@ -596,7 +596,6 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
-
}
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp
index e2fcd493db..95e529f47e 100644
--- a/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/cpp/src/qpid/broker/QueueBindings.cpp
@@ -20,8 +20,10 @@
*/
#include "QueueBindings.h"
#include "ExchangeRegistry.h"
+#include "qpid/framing/reply_exceptions.h"
using qpid::framing::FieldTable;
+using qpid::framing::NotFoundException;
using std::string;
using namespace qpid::broker;
@@ -35,7 +37,7 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
try {
exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args));
- } catch (ChannelException&) {
+ } catch (const NotFoundException&) {
}
}
}
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 9ca4069a12..56718502f1 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -23,6 +23,7 @@
#include "Connection.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#if HAVE_SASL
#include <sasl/sasl.h>
@@ -37,7 +38,7 @@ namespace broker {
class NullAuthenticator : public SaslAuthenticator
{
Connection& connection;
- framing::AMQP_ClientProxy::Connection010 client;
+ framing::AMQP_ClientProxy::Connection client;
public:
NullAuthenticator(Connection& connection);
~NullAuthenticator();
@@ -52,7 +53,7 @@ class CyrusAuthenticator : public SaslAuthenticator
{
sasl_conn_t *sasl_conn;
Connection& connection;
- framing::AMQP_ClientProxy::Connection010 client;
+ framing::AMQP_ClientProxy::Connection client;
void processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len);
@@ -117,7 +118,7 @@ void CyrusAuthenticator::init()
// TODO: Change this to an exception signaling
// server error, when one is available
- throw CommandInvalidException("Unable to perform authentication");
+ throw ConnectionForcedException("Unable to perform authentication");
}
}
@@ -146,7 +147,7 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
// TODO: Change this to an exception signaling
// server error, when one is available
- throw CommandInvalidException("Mechanism listing failed");
+ throw ConnectionForcedException("Mechanism listing failed");
} else {
string mechanism;
unsigned int start;
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
deleted file mode 100644
index 411e0ce3c0..0000000000
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SemanticHandler.h"
-#include "SemanticState.h"
-#include "SessionContext.h"
-#include "BrokerAdapter.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/ExecutionXCompleteBody.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-SemanticHandler::SemanticHandler(SessionContext& s) :
- state(*this,s), session(s),
- msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
- {}
-
-void SemanticHandler::handle(framing::AMQFrame& frame)
-{
- //TODO: assembly for method and headers
-
- //have potentially three separate tracks at this point:
- //
- // (1) execution controls
- // (2) commands
- // (3) data i.e. content-bearing commands
- //
- //framesets on each can be interleaved. framesets on the latter
- //two share a command-id sequence. controls on the first track are
- //used to communicate details about that command-id sequence.
- //
- //need to decide what to do if a frame on the command track
- //arrives while a frameset on the data track is still
- //open. execute it (i.e. out-of order execution with respect to
- //the command id sequence) or queue it up?
-
- TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
-
- switch(track) {
- case EXECUTION_CONTROL_TRACK:
- handleL3(frame.getMethod());
- break;
- case MODEL_COMMAND_TRACK:
- handleCommand(frame.getMethod());
- break;
- case MODEL_CONTENT_TRACK:
- handleContent(frame);
- break;
- }
-}
-
-void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- //record:
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- //ack messages:
- state.ackCumulative(mark.getValue());
- }
- range.processRanges(ackOp);
-}
-
-void SemanticHandler::sendCompletion()
-{
- SequenceNumber mark = incoming.getMark();
- SequenceNumberSet range = incoming.getRange();
- session.getProxy().getExecution().complete(mark.getValue(), range);
-}
-
-void SemanticHandler::flush()
-{
- incoming.flush();
- sendCompletion();
-}
-void SemanticHandler::sync()
-{
- incoming.sync();
- sendCompletion();
-}
-
-void SemanticHandler::noop()
-{
- incoming.noop();
-}
-
-void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
-{
- //never actually sent by client at present
-}
-
-void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
-{
- SequenceNumber id = incoming.next();
- BrokerAdapter adapter(state);
- Invoker::Result invoker = invoke(adapter, *method);
- incoming.complete(id);
-
- if (!invoker.wasHandled()) {
- throw NotImplementedException("Not implemented");
- } else if (invoker.hasResult()) {
- session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
- }
- if (method->isSync()) {
- incoming.sync(id);
- sendCompletion();
- }
- //TODO: if window gets too large send unsolicited completion
-}
-
-void SemanticHandler::handleL3(framing::AMQMethodBody* method)
-{
- if (!invoke(*this, *method))
- throw NotImplementedException("Not implemented");
-}
-
-void SemanticHandler::handleContent(AMQFrame& frame)
-{
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(incoming.next());
- msg = msgBuilder.getMessage();
- }
- msgBuilder.handle(frame);
- if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&session.getConnection());
- state.handle(msg);
- msgBuilder.end();
- incoming.track(msg);
- if (msg->getFrames().getMethod()->isSync()) {
- incoming.sync(msg->getCommandId());
- sendCompletion();
- }
- }
-}
-
-DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
-{
- uint32_t maxFrameSize = session.getConnection().getFrameMax();
- MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
- return outgoing.hwm;
-}
-
-SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
-{
- //will be replaced by field in 0-10 frame header
- uint8_t type = frame.getBody()->type();
- uint16_t classId;
- switch(type) {
- case METHOD_BODY:
- if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
- return MODEL_CONTENT_TRACK;
- }
-
- classId = frame.castBody<AMQMethodBody>()->amqpClassId();
- switch (classId) {
- case ExecutionXCompleteBody::CLASS_ID:
- return EXECUTION_CONTROL_TRACK;
- }
-
- return MODEL_COMMAND_TRACK;
- case HEADER_BODY:
- case CONTENT_BODY:
- return MODEL_CONTENT_TRACK;
- }
- throw Exception("Could not determine track");
-}
-
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
deleted file mode 100644
index 893a0cbded..0000000000
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _SemanticHandler_
-#define _SemanticHandler_
-
-#include <memory>
-#include "BrokerAdapter.h"
-#include "DeliveryAdapter.h"
-#include "MessageBuilder.h"
-#include "IncomingExecutionContext.h"
-#include "HandlerImpl.h"
-
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-#include <boost/function.hpp>
-
-namespace qpid {
-
-namespace framing {
-class AMQMethodBody;
-class AMQHeaderBody;
-class AMQContentBody;
-class AMQHeaderBody;
-}
-
-namespace broker {
-
-class SessionContext;
-
-class SemanticHandler : public DeliveryAdapter,
- public framing::FrameHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler
-
-{
- typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
-
- SemanticState state;
- SessionContext& session;
- // TODO aconway 2007-09-20: Why are these on the handler rather than the
- // state?
- IncomingExecutionContext incoming;
- framing::Window outgoing;
- MessageBuilder msgBuilder;
- RangedOperation ackOp;
-
- enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
- TrackId getTrack(const framing::AMQFrame& frame);
-
- void handleL3(framing::AMQMethodBody* method);
- void handleCommand(framing::AMQMethodBody* method);
- void handleContent(framing::AMQFrame& frame);
-
- void sendCompletion();
-
- //delivery adapter methods:
- DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
-
- framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- //Connection& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getConnection().getBroker(); }
-
-public:
- SemanticHandler(SessionContext& session);
-
- //frame handler:
- void handle(framing::AMQFrame& frame);
-
- //execution class method handlers:
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void result(uint32_t command, const std::string& data);
- void sync();
-
-
- SemanticState& getSemanticState() { return state; }
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ab6b82a232..e73540891c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -19,22 +19,19 @@
*
*/
-#include "SessionContext.h"
-#include "BrokerAdapter.h"
-#include "Queue.h"
+#include "SessionState.h"
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
#include "DtxTimeout.h"
#include "Message.h"
-#include "SemanticHandler.h"
-#include "SessionHandler.h"
+#include "Queue.h"
+#include "SessionContext.h"
#include "TxAccept.h"
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -357,9 +354,7 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
- if (msg->isA<MessageXTransferBody>()) {
- msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName);
- } else if (msg->isA<MessageTransferBody>()) {
+ if (msg->isA<MessageTransferBody>()) {
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index c92b9bb945..e284451d14 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -113,7 +113,7 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
- } catch (const ChannelException& e) {
+ } catch (const NotFoundException& e) {
return ExchangeQueryResult("", false, true, FieldTable());
}
}
@@ -163,7 +163,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
- } catch (const ChannelException&) {}
+ } catch (const NotFoundException&) {}
Queue::shared_ptr queue;
if (!queueName.empty()) {
@@ -192,7 +192,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han
SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
- destroyExclusiveQueues();
+ try {
+ destroyExclusiveQueues();
+ } catch (std::exception& e) {
+ QPID_LOG(error, e.what());
+ }
}
void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
@@ -370,7 +374,7 @@ void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, u_
state.addByteCredit(destination, value);
} else {
//unknown
- throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit));
}
}
@@ -384,7 +388,7 @@ void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destinat
//window
state.setWindowMode(destination);
} else{
- throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
}
}
@@ -419,19 +423,26 @@ framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const
return MessageAcquireResult(acquisitions);
}
+framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const std::string& /*destination*/,
+ const std::string& /*resumeId*/)
+{
+ throw NotImplementedException("resuming transfers not yet supported");
+}
+
+
void SessionAdapter::ExecutionHandlerImpl::sync()
{
//TODO
}
-void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/)
+void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
{
//TODO
}
void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
- uint32_t /*commandId*/,
+ const SequenceNumber& /*commandId*/,
uint8_t /*classCode*/,
uint8_t /*commandCode*/,
uint8_t /*fieldIndex*/,
@@ -470,7 +481,7 @@ void SessionAdapter::DtxHandlerImpl::select()
state.selectDtx();
}
-DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
+XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
bool fail,
bool suspend)
{
@@ -480,7 +491,7 @@ DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return DtxEndResult(XA_RBROLLBACK);
+ return XaResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -488,14 +499,14 @@ DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
} else {
state.endDtx(convert(xid), false);
}
- return DtxEndResult(XA_OK);
+ return XaResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return DtxEndResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
+XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
bool join,
bool resume)
{
@@ -508,41 +519,41 @@ DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
- return DtxStartResult(XA_OK);
+ return XaResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxStartResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
+XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
- return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return XaResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxPrepareResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
+XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
- return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return XaResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxCommitResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
+XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
- return DtxRollbackResult(XA_OK);
+ return XaResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxRollbackResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index b5bf44ceba..4eaaf13f8d 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -54,35 +54,19 @@ class Queue;
public:
SessionAdapter(SemanticState& session);
-
framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
- Message010Handler* getMessage010Handler(){ return &messageImpl; }
- Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
- Queue010Handler* getQueue010Handler(){ return &queueImpl; }
- Execution010Handler* getExecution010Handler(){ return &executionImpl; }
- Tx010Handler* getTx010Handler(){ return &txImpl; }
- Dtx010Handler* getDtx010Handler(){ return &dtxImpl; }
-
- BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
- ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented"); }
- FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
- StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
- TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); }
- ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ MessageHandler* getMessageHandler(){ return &messageImpl; }
+ ExchangeHandler* getExchangeHandler(){ return &exchangeImpl; }
+ QueueHandler* getQueueHandler(){ return &queueImpl; }
+ ExecutionHandler* getExecutionHandler(){ return &executionImpl; }
+ TxHandler* getTxHandler(){ return &txImpl; }
+ DtxHandler* getDtxHandler(){ return &dtxImpl; }
+
ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); }
SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); }
- Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
-
- void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); }
+ FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
private:
//common base for utility methods etc that are specific to this adapter
@@ -95,7 +79,7 @@ class Queue;
class ExchangeHandlerImpl :
- public Exchange010Handler,
+ public ExchangeHandler,
public HandlerHelper
{
public:
@@ -124,7 +108,7 @@ class Queue;
shared_ptr<Exchange> alternate);
};
- class QueueHandlerImpl : public Queue010Handler,
+ class QueueHandlerImpl : public QueueHandler,
public HandlerHelper, public OwnershipToken
{
Broker& broker;
@@ -149,7 +133,7 @@ class Queue;
};
class MessageHandlerImpl :
- public Message010Handler,
+ public MessageHandler,
public HandlerHelper
{
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
@@ -196,18 +180,21 @@ class Queue;
void flush(const string& destination);
void stop(const string& destination);
+
+ framing::MessageResumeResult resume(const std::string& destination,
+ const std::string& resumeId);
};
- class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper
+ class ExecutionHandlerImpl : public ExecutionHandler, public HandlerHelper
{
public:
ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void sync();
- void result(uint32_t commandId, const string& value);
+ void result(const framing::SequenceNumber& commandId, const string& value);
void exception(uint16_t errorCode,
- uint32_t commandId,
+ const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t fieldIndex,
@@ -216,7 +203,7 @@ class Queue;
};
- class TxHandlerImpl : public Tx010Handler, public HandlerHelper
+ class TxHandlerImpl : public TxHandler, public HandlerHelper
{
public:
TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
@@ -226,7 +213,7 @@ class Queue;
void rollback();
};
- class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper
+ class DtxHandlerImpl : public DtxHandler, public HandlerHelper, private framing::StructHelper
{
std::string convert(const framing::Xid& xid);
@@ -235,26 +222,26 @@ class Queue;
void select();
- framing::DtxStartResult start(const framing::Xid& xid,
+ framing::XaResult start(const framing::Xid& xid,
bool join,
bool resume);
- framing::DtxEndResult end(const framing::Xid& xid,
+ framing::XaResult end(const framing::Xid& xid,
bool fail,
bool suspend);
- framing::DtxCommitResult commit(const framing::Xid& xid,
+ framing::XaResult commit(const framing::Xid& xid,
bool onePhase);
void forget(const framing::Xid& xid);
framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid);
- framing::DtxPrepareResult prepare(const framing::Xid& xid);
+ framing::XaResult prepare(const framing::Xid& xid);
framing::DtxRecoverResult recover();
- framing::DtxRollbackResult rollback(const framing::Xid& xid);
+ framing::XaResult rollback(const framing::Xid& xid);
void setTimeout(const framing::Xid& xid, uint32_t timeout);
};
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index d5caf789c0..f5fa22060f 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -42,7 +42,8 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch)
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false) {}
+ ignoring(false)
+{}
SessionHandler::~SessionHandler() {}
@@ -52,33 +53,30 @@ MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
void SessionHandler::handleIn(AMQFrame& f) {
- // Note on channel states: a channel is open if session != 0. A
- // channel that is closed (session == 0) can be in the "ignoring"
- // state. This is a temporary state after we have sent a channel
- // exception, where extra frames might arrive that should be
- // ignored.
- //
+ // Note on channel states: a channel is attached if session != 0
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
+ if (ignoring && !(m && m->isA<SessionDetachedBody>())) {
+ return;
+ }
+ if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
+ //frame was a valid session control and has been handled
return;
} else if (session.get()) {
+ //we are attached and frame was not a session control so it is for upper layers
session->handle(f);
- } else if (!ignoring) {
- throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached"));
+ } else {
+ throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
}
+ }catch(const ChannelException& e){
+ QPID_LOG(error, "Session detached due to: " << e.what());
+ peerSession.detached(name, e.code);
+ handleDetach();
+ connection.closeChannel(channel.get());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
- }catch(const SessionException& e){
- //execution.exception will have been sent already
- ignoring = true;
- //peerSession.requestTimeout(0);
- session->setTimeout(0);
- peerSession.detach(name);
- localSuspend();
}catch(const std::exception& e){
- connection.close(
- framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+ connection.close(501, e.what(), classId(m), methodId(m));
}
}
@@ -95,7 +93,7 @@ void SessionHandler::handleOut(AMQFrame& f) {
void SessionHandler::assertAttached(const char* method) const {
if (!session.get()) {
std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
- throw ChannelErrorException(
+ throw NotAttachedException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
@@ -103,33 +101,23 @@ void SessionHandler::assertAttached(const char* method) const {
void SessionHandler::assertClosed(const char* method) const {
if (session.get())
- throw ChannelBusyException(
+ throw SessionBusyException(
QPID_MSG(method << " failed: channel " << channel.get()
<< " is already open."));
}
-void SessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getSessionManager().suspend(session);
- session.reset();
- }
-}
-
-
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
//new methods:
void SessionHandler::attach(const std::string& _name, bool /*force*/)
{
- //TODO: need to revise session manager to support resume as well
- assertClosed("attach");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, 0));
name = _name;//TODO: this should be used in conjunction with
//userid for connection as sessions identity
- session.reset(state.release());
+
+ //TODO: need to revise session manager to support resume as well
+ assertClosed("attach");
+ session.reset(new SessionState(0, this, 0, 0));
peerSession.attached(name);
peerSession.commandPoint(session->nextOut, 0);
}
@@ -138,31 +126,46 @@ void SessionHandler::attached(const std::string& _name)
{
name = _name;//TODO: this should be used in conjunction with
//userid for connection as sessions identity
- std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
- session.reset(state.release());
+ session.reset(new SessionState(0, this, 0, 0));
peerSession.commandPoint(session->nextOut, 0);
}
void SessionHandler::detach(const std::string& name)
{
assertAttached("detach");
- localSuspend();
- peerSession.detached(name, 0);
+ peerSession.detached(name, session::NORMAL);
+ handleDetach();
assert(&connection.getChannel(channel.get()) == this);
connection.closeChannel(channel.get());
}
void SessionHandler::detached(const std::string& name, uint8_t code)
{
- ignoring=false;
- session->detach();
- session.reset();
+ ignoring = false;
+ handleDetach();
if (code) {
//no error
} else {
//error occured
QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
}
+ connection.closeChannel(channel.get());
+}
+
+void SessionHandler::handleDetach()
+{
+ if (session.get()) {
+ session->detach();
+ session.reset();
+ }
+}
+
+void SessionHandler::requestDetach()
+{
+ //TODO: request timeout when python can handle it
+ //peerSession.requestTimeout(0);
+ ignoring = true;
+ peerSession.detach(name);
}
void SessionHandler::requestTimeout(uint32_t t)
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index c299c465cf..47c534441a 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -46,7 +46,7 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
+class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
@@ -66,9 +66,8 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
- // Called by closing connection.
- void localSuspend();
- void detach() { localSuspend(); }
+ void requestDetach();
+ void handleDetach();
void sendCompletion();
protected:
@@ -93,9 +92,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void flush(bool expected, bool confirmed, bool completed);
void gap(const framing::SequenceSet& commands);
- //hacks for old generator:
- void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
-
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
@@ -105,7 +101,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session010 peerSession;
+ framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
std::string name;//TODO: this should be part of the session state and replace the id
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index b96d7b5e3f..50938de8ac 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -22,7 +22,6 @@
#include "Broker.h"
#include "ConnectionState.h"
#include "MessageDelivery.h"
-#include "SemanticHandler.h"
#include "SessionManager.h"
#include "SessionHandler.h"
#include "qpid/framing/AMQContentBody.h"
@@ -30,6 +29,7 @@
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -50,6 +50,7 @@ SessionState::SessionState(
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
+ ignoring(false),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
@@ -154,7 +155,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->detach();
+ handler->requestDetach();
}
status = Manageable::STATUS_OK;
break;
@@ -188,7 +189,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
nextOut++;//execution result is now a command, so the counter must be incremented
- getProxy().getExecution010().result(id, invocation.getResult());
+ getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
@@ -242,12 +243,13 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
completed.add(msg->getCommandId());
if (msg->requiresAccept()) {
nextOut++;//accept is a command, so the counter must be incremented
- getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
}
void SessionState::handle(AMQFrame& frame)
{
+ if (ignoring) return;
received(frame);
SequenceNumber commandId;
@@ -271,11 +273,13 @@ void SessionState::handle(AMQFrame& frame)
AMQMethodBody* m = frame.getMethod();
if (m) {
- getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
+ getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
} else {
- getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
+ getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- throw e;
+ timeout = 0;
+ ignoring = true;
+ handler->requestDetach();
}
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 4fc2ae4cc5..2ec68260a1 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -130,6 +130,7 @@ class SessionState : public framing::SessionState,
Broker& broker;
framing::ProtocolVersion version;
sys::Mutex lock;
+ bool ignoring;
SemanticState semanticState;
SessionAdapter adapter;
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index f32b5e2614..3bcba8983c 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -32,6 +32,7 @@
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace std;
using namespace boost;
@@ -75,7 +76,7 @@ void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
- throw ChannelBusyException();
+ throw SessionBusyException();
active = true;
session = s;
if(isTransactional()) {
@@ -146,7 +147,7 @@ void Channel::consume(
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
- throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag ));
+ throw PreconditionFailedException(QPID_MSG("Consumer already exists with tag " << tag ));
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 83cc357ded..df27942008 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -25,6 +25,7 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::client;
using namespace qpid::framing;
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 2ce36d6991..d7ab97ce31 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -55,9 +55,9 @@ class ConnectionHandler : private StateManager,
public ConnectionProperties,
public ChainableFrameHandler,
public framing::InputHandler,
- private framing::AMQP_ClientOperations::Connection010Handler
+ private framing::AMQP_ClientOperations::ConnectionHandler
{
- typedef framing::AMQP_ClientOperations::Connection010Handler ConnectionOperations;
+ typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
@@ -70,7 +70,7 @@ class ConnectionHandler : private StateManager,
};
Adapter outHandler;
- framing::AMQP_ServerProxy::Connection010 proxy;
+ framing::AMQP_ServerProxy::Connection proxy;
uint16_t errorCode;
std::string errorText;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index d1fd66ff26..ce95e43f58 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -32,6 +32,7 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
: connector(c), isClosed(false), isClosing(false)
@@ -39,7 +40,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
- REPLY_SUCCESS, std::string());
+ NORMAL, std::string());
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
@@ -57,7 +58,7 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
{
Mutex::ScopedLock l(lock);
boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
- if (s.lock()) throw ChannelBusyException();
+ if (s.lock()) throw SessionBusyException();
s = session;
}
@@ -74,7 +75,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s = sessions[frame.getChannel()].lock();
}
if (!s)
- throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
+ throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel()));
s->in(frame);
}
@@ -113,7 +114,7 @@ void ConnectionImpl::close()
Mutex::ScopedUnlock u(lock);
handler.close();
}
- closed(REPLY_SUCCESS, "Closed by client");
+ closed(NORMAL, "Closed by client");
}
// Set closed flags and erase the sessions map, but keep the contents
@@ -149,7 +150,7 @@ void ConnectionImpl::shutdown()
handler.fail(CONN_CLOSED);
Mutex::ScopedUnlock u(lock);
std::for_each(save.begin(), save.end(),
- boost::bind(&SessionImpl::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED));
+ boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h
index 5d91f289e2..fc4175ef22 100644
--- a/cpp/src/qpid/client/Session.h
+++ b/cpp/src/qpid/client/Session.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/client/Session_99_0.h"
+#include "qpid/client/Session_0_10.h"
namespace qpid {
namespace client {
@@ -31,7 +31,7 @@ namespace client {
*
* \ingroup clientapi
*/
-typedef Session_99_0 Session;
+typedef Session_0_10 Session;
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 4f3869319c..571d54df0c 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -30,16 +30,18 @@
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-namespace { const std::string OK="ok"; }
+namespace { const std::string EMPTY; }
namespace qpid {
namespace client {
using namespace qpid::framing;
+using namespace qpid::framing::session;//for detach codes
typedef sys::Monitor::ScopedLock Lock;
typedef sys::Monitor::ScopedUnlock UnLock;
@@ -47,8 +49,9 @@ typedef sys::Monitor::ScopedUnlock UnLock;
SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
uint16_t ch, uint64_t _maxFrameSize)
- : code(REPLY_SUCCESS),
- text(OK),
+ : error(OK),
+ code(NORMAL),
+ text(EMPTY),
state(INACTIVE),
syncMode(false),
detachedLifetime(0),
@@ -250,6 +253,7 @@ void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool
void SessionImpl::connectionClosed(uint16_t _code, const std::string& _text)
{
Lock l(state);
+ error = CONNECTION_CLOSE;
code = _code;
text = _text;
setState(DETACHED);
@@ -379,6 +383,7 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
//TODO: proper 0-10 exception handling
QPID_LOG(error, "Session exception:" << e.what());
Lock l(state);
+ error = EXCEPTION;
code = e.code;
text = e.what();
}
@@ -443,6 +448,7 @@ void SessionImpl::detached(const std::string& _name, uint8_t _code)
//TODO: make sure this works with execution.exception - don't
//want to overwrite the code from that
QPID_LOG(error, "Session detached by peer: " << name << " " << code);
+ error = SESSION_DETACH;
code = _code;
text = "Session detached by peer";
}
@@ -545,14 +551,14 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/)
void SessionImpl::sync() {}
-void SessionImpl::result(uint32_t commandId, const std::string& value)
+void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value)
{
Lock l(state);
results.received(commandId, value);
}
void SessionImpl::exception(uint16_t errorCode,
- uint32_t commandId,
+ const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t /*fieldIndex*/,
@@ -563,6 +569,7 @@ void SessionImpl::exception(uint16_t errorCode,
<< " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
Lock l(state);
+ error = EXCEPTION;
code = errorCode;
text = description;
if (detachedLifetime) {
@@ -589,8 +596,11 @@ inline void SessionImpl::waitFor(State s) //call with lock held
void SessionImpl::check() const //call with lock held.
{
- if (code != REPLY_SUCCESS) {
- throwReplyException(code, text);
+ switch (error) {
+ case OK: break;
+ case CONNECTION_CLOSE: throw ConnectionException(code, text);
+ case SESSION_DETACH: throw ChannelException(code, text);
+ case EXCEPTION: throwExecutionException(code, text);
}
}
@@ -598,7 +608,7 @@ void SessionImpl::checkOpen() const //call with lock held.
{
check();
if (state != ATTACHED) {
- throwReplyException(0, "Session isn't attached");
+ throw NotAttachedException("Session isn't attached");
}
}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 86820dbb92..3b2e80fefd 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -54,8 +54,8 @@ class ConnectionImpl;
class SessionImpl : public framing::FrameHandler::InOutHandler,
public Execution,
- private framing::AMQP_ClientOperations::Session010Handler,
- private framing::AMQP_ClientOperations::Execution010Handler
+ private framing::AMQP_ClientOperations::SessionHandler,
+ private framing::AMQP_ClientOperations::ExecutionHandler
{
public:
SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
@@ -95,6 +95,12 @@ public:
void connectionBroke(uint16_t code, const std::string& text);
private:
+ enum ErrorType {
+ OK,
+ CONNECTION_CLOSE,
+ SESSION_DETACH,
+ EXCEPTION
+ };
enum State {
INACTIVE,
ATTACHING,
@@ -102,8 +108,8 @@ private:
DETACHING,
DETACHED
};
- typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler;
- typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler;
+ typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
+ typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
typedef sys::StateMonitor<State, DETACHED> StateMonitor;
typedef StateMonitor::Set States;
@@ -145,19 +151,16 @@ private:
// Note: Following methods are called by network thread in
// response to execution commands from the broker
void sync();
- void result(uint32_t commandId, const std::string& value);
+ void result(const framing::SequenceNumber& commandId, const std::string& value);
void exception(uint16_t errorCode,
- uint32_t commandId,
+ const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t fieldIndex,
const std::string& description,
const framing::FieldTable& errorInfo);
-
- //hack for old generator:
- void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
-
+ ErrorType error;
int code; // Error code
std::string text; // Error text
mutable StateMonitor state;
@@ -170,7 +173,7 @@ private:
shared_ptr<ConnectionImpl> connection;
framing::ChannelHandler channel;
- framing::AMQP_ServerProxy::Session010 proxy;
+ framing::AMQP_ServerProxy::Session proxy;
Results results;
Demux demux;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5152aa2e43..bca6c49c13 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,7 @@
*/
#include "Cluster.h"
-#include "qpid/broker/PreviewSessionState.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,18 +32,18 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::PreviewSessionState;
+using broker::SessionState;
namespace {
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- PreviewSessionState& session;
+ SessionState& session;
Cluster& cluster;
bool busy;
Monitor lock;
- ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+ ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
void handle(AMQFrame& f) {
Mutex::ScopedLock l(lock);
@@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) {
c.next = h;
}
-struct SessionObserver : public broker::PreviewSessionManager::Observer {
+struct SessionObserver : public broker::SessionManager::Observer {
Cluster& cluster;
SessionObserver(Cluster& c) : cluster(c) {}
- void opened(PreviewSessionState& s) {
+ void opened(SessionState& s) {
// FIXME aconway 2008-01-29: IList for memory management.
ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 1b0c1b1689..6cc8dd7f78 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -63,7 +63,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; }
+ boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -117,7 +117,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::PreviewSessionManager::Observer> observer;
+ boost::intrusive_ptr<broker::SessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index eeb658600d..3ebb61feb5 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -73,7 +73,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t flags = buffer.getOctet();
uint8_t framing_version = (flags & 0xc0) >> 6;
if (framing_version != 0)
- throw SyntaxErrorException(QPID_MSG("Framing version unsupported"));
+ throw FramingErrorException(QPID_MSG("Framing version unsupported"));
bof = flags & 0x08;
eof = flags & 0x04;
bos = flags & 0x02;
@@ -81,7 +81,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t type = buffer.getOctet();
uint16_t frame_size = buffer.getShort();
if (frame_size < frameOverhead()-1)
- throw SyntaxErrorException(QPID_MSG("Frame size too small"));
+ throw FramingErrorException(QPID_MSG("Frame size too small"));
uint8_t reserved1 = buffer.getOctet();
uint8_t field1 = buffer.getOctet();
subchannel = field1 & 0x0f;
@@ -92,7 +92,7 @@ bool AMQFrame::decode(Buffer& buffer)
// TODO: should we check reserved2 against zero as well? - the
// spec isn't clear
if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
- throw SyntaxErrorException(QPID_MSG("Reserved bits not zero"));
+ throw FramingErrorException(QPID_MSG("Reserved bits not zero"));
// TODO: should no longer care about body size and only pass up
// B,E,b,e flags
@@ -105,7 +105,7 @@ bool AMQFrame::decode(Buffer& buffer)
body->decode(type,buffer, body_size);
uint8_t end = buffer.getOctet();
if (end != 0xCE)
- throw SyntaxErrorException(QPID_MSG("Frame end not found"));
+ throw FramingErrorException(QPID_MSG("Frame end not found"));
return true;
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 2064468785..c69a768291 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -26,8 +26,6 @@
#include "Buffer.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
-#include "qpid/framing/PreviewDeliveryProperties.h"
-#include "qpid/framing/PreviewMessageProperties.h"
#include <iostream>
#include <boost/optional.hpp>
@@ -77,10 +75,7 @@ class AMQHeaderBody : public AMQBody
};
// Could use boost::mpl::fold to construct a larger set.
- typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>,
- MessageProperties>,
- PreviewDeliveryProperties>,
- PreviewMessageProperties> Properties;
+ typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
Properties properties;
diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp
index 71281c7a52..f0b6331ff3 100644
--- a/cpp/src/qpid/framing/Array.cpp
+++ b/cpp/src/qpid/framing/Array.cpp
@@ -77,7 +77,7 @@ void Array::decode(Buffer& buffer){
uint32_t size = buffer.getLong();//size added only when array is a top-level type
uint32_t available = buffer.available();
if (available < size) {
- throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected "
+ throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected "
<< size << " bytes but only " << available << " available"));
}
if (size) {
@@ -88,7 +88,7 @@ void Array::decode(Buffer& buffer){
dummy.setType(typeOctet);
available = buffer.available();
if (available < count * dummy.getData().size()) {
- throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected "
+ throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected "
<< count << " items of " << dummy.getData().size()
<< " bytes each but only " << available << " bytes available"));
}
@@ -117,7 +117,7 @@ bool Array::operator==(const Array& x) const {
void Array::add(ValuePtr value)
{
if (typeOctet != value->getType()) {
- throw SyntaxErrorException(QPID_MSG("Wrong type of value, expected " << typeOctet));
+ throw IllegalArgumentException(QPID_MSG("Wrong type of value, expected " << typeOctet));
}
values.push_back(value);
}
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index fb84be7cd6..ffbcf33a95 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -48,7 +48,7 @@ void BodyHandler::handleBody(AMQBody* body) {
handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
- throw SyntaxErrorException(
+ throw FramingErrorException(
QPID_MSG("Invalid frame type " << body->type()));
}
}
diff --git a/cpp/src/qpid/framing/BodyHolder.cpp b/cpp/src/qpid/framing/BodyHolder.cpp
index de971b5b28..1b2f74de2c 100644
--- a/cpp/src/qpid/framing/BodyHolder.cpp
+++ b/cpp/src/qpid/framing/BodyHolder.cpp
@@ -59,7 +59,7 @@ void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) {
case CONTENT_BODY: *this=in_place<AMQContentBody>(); break;
case HEARTBEAT_BODY: *this=in_place<AMQHeartbeatBody>(); break;
default:
- throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type));
+ throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type));
}
get()->decode(buffer, size);
}
diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp
index 69168d462a..19c94ffd58 100644
--- a/cpp/src/qpid/framing/Buffer.cpp
+++ b/cpp/src/qpid/framing/Buffer.cpp
@@ -19,7 +19,6 @@
*
*/
#include "Buffer.h"
-#include "FramingContent.h"
#include "FieldTable.h"
#include <string.h>
#include <boost/format.hpp>
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index ac2a0f286d..903c7ed100 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -133,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){
uint32_t len = buffer.getLong();
uint32_t available = buffer.available();
if (available < len)
- throw SyntaxErrorException(QPID_MSG("Not enough data for field table."));
+ throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t leftover = available - len;
while(buffer.available() > leftover){
std::string name;
diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp
index 8171a94ef2..681f20a793 100644
--- a/cpp/src/qpid/framing/FieldValue.cpp
+++ b/cpp/src/qpid/framing/FieldValue.cpp
@@ -79,7 +79,7 @@ void FieldValue::setType(uint8_t type)
data.reset(new FixedWidthValue<0>());
break;
default:
- throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
+ throw IllegalArgumentException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
}
}
diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp
deleted file mode 100644
index cd134b0e89..0000000000
--- a/cpp/src/qpid/framing/FramingContent.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "Buffer.h"
-#include "FramingContent.h"
-#include "qpid/Exception.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace framing {
-
-Content::Content() : discriminator(0) {}
-
-Content::Content(uint8_t _discriminator, const string& _value): discriminator(_discriminator), value(_value) {
- validate();
-}
-
-void Content::validate() {
- if (discriminator == REFERENCE) {
- if(value.empty()) {
- throw InvalidArgumentException(
- QPID_MSG("Reference cannot be empty"));
- }
- }else if (discriminator != INLINE) {
- throw SyntaxErrorException(
- QPID_MSG("Invalid discriminator: " << discriminator));
- }
-}
-
-Content::~Content() {}
-
-void Content::encode(Buffer& buffer) const {
- buffer.putOctet(discriminator);
- buffer.putLongString(value);
-}
-
-void Content::decode(Buffer& buffer) {
- discriminator = buffer.getOctet();
- buffer.getLongString(value);
- validate();
-}
-
-size_t Content::size() const {
- return 1/*discriminator*/ + 4/*for recording size of long string*/ + value.size();
-}
-
-std::ostream& operator<<(std::ostream& out, const Content& content) {
- if (content.discriminator == REFERENCE) {
- out << "{REF:" << content.value << "}";
- } else if (content.discriminator == INLINE) {
- out << "{INLINE:" << content.value.size() << " bytes}";
- }
- return out;
-}
-
-}} // namespace framing::qpid
diff --git a/cpp/src/qpid/framing/FramingContent.h b/cpp/src/qpid/framing/FramingContent.h
deleted file mode 100644
index 36f5d641b2..0000000000
--- a/cpp/src/qpid/framing/FramingContent.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _framing_FramingContent_h
-#define _framing_FramingContent_h
-
-#include <ostream>
-
-namespace qpid {
-namespace framing {
-
-class Buffer;
-
-enum discriminator_types { INLINE = 0, REFERENCE = 1 };
-
-/**
- * A representation of the AMQP Content data type (used for message
- * bodies) which can hold inline data or a reference.
- */
-class Content
-{
- uint8_t discriminator;
- string value;
-
- void validate();
-
- public:
- Content();
- Content(uint8_t _discriminator, const string& _value);
- ~Content();
-
- void encode(Buffer& buffer) const;
- void decode(Buffer& buffer);
- size_t size() const;
- bool isInline() const { return discriminator == INLINE; }
- bool isReference() const { return discriminator == REFERENCE; }
- const string& getValue() const { return value; }
- void setValue(const string& newValue) { value = newValue; }
-
- friend std::ostream& operator<<(std::ostream&, const Content&);
-};
-
-}} // namespace qpid::framing
-
-
-#endif /*!_framing_FramingContent_h*/
diff --git a/cpp/src/qpid/framing/ModelMethod.h b/cpp/src/qpid/framing/ModelMethod.h
index 07600aadca..8e4361e761 100644
--- a/cpp/src/qpid/framing/ModelMethod.h
+++ b/cpp/src/qpid/framing/ModelMethod.h
@@ -22,7 +22,7 @@
*
*/
#include "AMQMethodBody.h"
-#include "qpid/framing/ExecutionHeader.h"
+#include "qpid/framing/Header.h"
namespace qpid {
namespace framing {
@@ -30,7 +30,7 @@ namespace framing {
class ModelMethod : public AMQMethodBody
{
- mutable ExecutionHeader header;
+ mutable Header header;
public:
virtual ~ModelMethod() {}
virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); }
@@ -38,8 +38,8 @@ public:
virtual uint32_t headerSize() const { return header.size(); }
virtual bool isSync() const { return header.getSync(); }
virtual void setSync(bool on) const { header.setSync(on); }
- ExecutionHeader& getHeader() { return header; }
- const ExecutionHeader& getHeader() const { return header; }
+ Header& getHeader() { return header; }
+ const Header& getHeader() const { return header; }
};
diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp
index 1b62d296c6..cba00c860a 100644
--- a/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -20,8 +20,10 @@
*/
#include "SequenceNumber.h"
+#include "Buffer.h"
using qpid::framing::SequenceNumber;
+using qpid::framing::Buffer;
SequenceNumber::SequenceNumber() : value(0 - 1) {}
@@ -77,6 +79,20 @@ bool SequenceNumber::operator>=(const SequenceNumber& other) const
return *this == other || *this > other;
}
+void SequenceNumber::encode(Buffer& buffer) const
+{
+ buffer.putLong(value);
+}
+
+void SequenceNumber::decode(Buffer& buffer)
+{
+ value = buffer.getLong();
+}
+
+uint32_t SequenceNumber::size() const {
+ return 4;
+}
+
namespace qpid {
namespace framing {
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index 0ed591b804..d659bec5c1 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -26,6 +26,8 @@
namespace qpid {
namespace framing {
+class Buffer;
+
/**
* 4-byte sequence number that 'wraps around'.
*/
@@ -51,6 +53,10 @@ class SequenceNumber
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ uint32_t size() const;
+
template <class S> void serialize(S& s) { s(value); }
};
diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp
index 2683b0025d..cdf890b7f8 100644
--- a/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/cpp/src/qpid/framing/SequenceSet.cpp
@@ -49,7 +49,7 @@ void SequenceSet::decode(Buffer& buffer)
uint16_t size = buffer.getShort();
uint16_t count = size / RANGE_SIZE;//number of ranges
if (size % RANGE_SIZE)
- throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size));
+ throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size));
for (uint16_t i = 0; i < count; i++) {
add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong()));
diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp
index 2918c48ce3..b58d9fce96 100644
--- a/cpp/src/qpid/framing/Uuid.cpp
+++ b/cpp/src/qpid/framing/Uuid.cpp
@@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const {
void Uuid::decode(Buffer& buf) {
if (buf.available() < size())
- throw SyntaxErrorException(QPID_MSG("Not enough data for UUID."));
+ throw IllegalArgumentException(QPID_MSG("Not enough data for UUID."));
buf.getRawData(c_array(), size());
}
diff --git a/cpp/src/qpid/framing/amqp_types_full.h b/cpp/src/qpid/framing/amqp_types_full.h
index 1ab8777896..d0aaf28cb4 100644
--- a/cpp/src/qpid/framing/amqp_types_full.h
+++ b/cpp/src/qpid/framing/amqp_types_full.h
@@ -31,9 +31,7 @@
#include "amqp_types.h"
#include "Array.h"
-#include "FramingContent.h"
#include "FieldTable.h"
-#include "SequenceNumberSet.h"
#include "SequenceSet.h"
#include "Uuid.h"
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 94e2c025d6..0b69f76a76 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include "MessageUtils.h"
@@ -166,7 +167,7 @@ class ExchangeTest : public CppUnit::TestCase
exchanges.destroy("my-exchange");
try {
exchanges.get("my-exchange");
- } catch (const ChannelException&) {}
+ } catch (const NotFoundException&) {}
std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, FieldTable());
CPPUNIT_ASSERT_EQUAL(string("direct"), response.first->getType());
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index 7c68973d4d..f75269c959 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -91,7 +91,7 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
Thread t(fix.subs);
fix.connection.proxy.close();
t.join();
- BOOST_CHECK_THROW(fix.session.close(), InternalErrorException);
+ BOOST_CHECK_THROW(fix.session.close(), ConnectionException);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp
index 1d77408eff..51dd2cb924 100644
--- a/cpp/src/tests/interop_runner.cpp
+++ b/cpp/src/tests/interop_runner.cpp
@@ -158,7 +158,7 @@ void Listener::sendResponse(Message& response, Message& request)
void Listener::sendResponse(Message& response, ReplyTo replyTo)
{
- string exchange = replyTo.getExchangeName();
+ string exchange = replyTo.getExchange();
string routingKey = replyTo.getRoutingKey();
channel.publish(response, exchange, routingKey);
}
diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests
index ce6b1f3810..e4b70f5ff5 100755
--- a/cpp/src/tests/python_tests
+++ b/cpp/src/tests/python_tests
@@ -14,7 +14,6 @@ run() {
if test -d ${PYTHON_DIR} ; then
cd ${PYTHON_DIR}
run 0-10-errata cpp_failing_0-10.txt
- if test -z "$QPID_NO_PREVIEW" ; then run ../specs/amqp.0-10-preview.xml cpp_failing_0-10_preview.txt; fi
else
echo Warning: python tests not found.
fi