diff options
Diffstat (limited to 'cpp')
23 files changed, 907 insertions, 328 deletions
diff --git a/cpp/rubygen/framing.0-10/Operations.rb b/cpp/rubygen/framing.0-10/Operations.rb index a22a591f14..4a67df8b92 100755 --- a/cpp/rubygen/framing.0-10/Operations.rb +++ b/cpp/rubygen/framing.0-10/Operations.rb @@ -24,8 +24,13 @@ class OperationsGen < CppGen def handler_classname(c) c.name.caps+"Handler"; end + def methods_on(parent, chassis) + chassis == "all" ? parent.methods_ : parent.methods_on(chassis) + end + def handler_class(c) - if (!c.methods_on(@chassis).empty?) + m = methods_on(c,@chassis) + if (not m.empty?) handlerclass=handler_classname c gen <<EOS // ==================== class #{handlerclass} ==================== @@ -38,7 +43,7 @@ class #{handlerclass} { virtual ~#{handlerclass}() {} // Protocol methods EOS - c.methods_on(@chassis).each { |m| handler_method(m) if !m.content() } + m.each { |m| handler_method(m) if !m.content() } gen <<EOS }; // class #{handlerclass} @@ -48,7 +53,8 @@ EOS end def handler_get(c) - if (!c.methods_on(@chassis).empty?) + m = methods_on(c,@chassis) + if (not m.empty?) handlerclass=handler_classname c gen "virtual #{handlerclass}* get#{handlerclass}() = 0;\n" end @@ -93,4 +99,5 @@ end OperationsGen.new("client",ARGV[0], $amqp).generate() OperationsGen.new("server",ARGV[0], $amqp).generate() +OperationsGen.new("all",ARGV[0], $amqp).generate() diff --git a/cpp/rubygen/framing.0-10/OperationsInvoker.rb b/cpp/rubygen/framing.0-10/OperationsInvoker.rb index 642f98ce8e..44006207ca 100755 --- a/cpp/rubygen/framing.0-10/OperationsInvoker.rb +++ b/cpp/rubygen/framing.0-10/OperationsInvoker.rb @@ -13,10 +13,15 @@ class OperationsInvokerGen < CppGen @filename="qpid/framing/#{@chassis.caps}Invoker" end + def methods_on(parent, chassis) + chassis == "all" ? parent.methods_ : parent.methods_on(chassis) + end + def handler(c) "#{@ops}::#{c.cppname}Handler"; end def getter(c) "get#{c.cppname}Handler"; end def invoker(c) "#{handler(c)}::Invoker"; end - def visit_methods(c) c.methods_on(@chassis).select { |m| !m.content } end + def visit_methods(c) methods_on(c, @chassis).select { |m| !m.content } end + def handler_visits_cpp(c) visit_methods(c).each { |m| @@ -90,3 +95,4 @@ end OperationsInvokerGen.new("client",ARGV[0], $amqp).generate() OperationsInvokerGen.new("server",ARGV[0], $amqp).generate() +OperationsInvokerGen.new("all",ARGV[0], $amqp).generate() diff --git a/cpp/rubygen/framing.0-10/Proxy.rb b/cpp/rubygen/framing.0-10/Proxy.rb index 87d809d4ad..71a6b954c6 100755 --- a/cpp/rubygen/framing.0-10/Proxy.rb +++ b/cpp/rubygen/framing.0-10/Proxy.rb @@ -11,6 +11,10 @@ class ProxyGen < CppGen @filename="qpid/framing/#{@classname}" end + def methods_on(parent, chassis) + chassis == "all" ? parent.methods_ : parent.methods_on(chassis) + end + def proxy_member(c) c.name.lcaps+"Proxy"; end def inner_class_decl(c) @@ -21,7 +25,7 @@ public: #{cname}(FrameHandler& f) : Proxy(f) {} static #{cname}& get(#{@classname}& proxy) { return proxy.get#{cname}(); } EOS - c.methods_on(@chassis).each { |m| + methods_on(c, @chassis).each { |m| genl "virtual void #{m.cppname}(#{m.signature.join(",\n ")});" genl }} @@ -29,7 +33,7 @@ EOS def inner_class_defn(c) cname=c.cppname - c.methods_on(@chassis).each { |m| + methods_on(c, @chassis).each { |m| genl "void #{@classname}::#{cname}::#{m.cppname}(#{m.signature.join(", ")})" scope { params=(["getVersion()"]+m.param_names).join(", ") @@ -64,7 +68,7 @@ EOS include "<sstream>" include "#{@classname}.h" include "qpid/framing/amqp_types_full.h" - @amqp.methods_on(@chassis).each { + methods_on(@amqp, @chassis).each { |m| include "qpid/framing/"+m.body_name } genl @@ -81,4 +85,5 @@ end ProxyGen.new("client", $outdir, $amqp).generate; ProxyGen.new("server", $outdir, $amqp).generate; +ProxyGen.new("all", $outdir, $amqp).generate; diff --git a/cpp/rubygen/framing.0-10/constants.rb b/cpp/rubygen/framing.0-10/constants.rb index 35067a733c..752f50b6e9 100755 --- a/cpp/rubygen/framing.0-10/constants.rb +++ b/cpp/rubygen/framing.0-10/constants.rb @@ -48,7 +48,8 @@ class ConstantsGen < CppGen genl doxygen_comment { genl c.doc } struct(c.name.caps+"Exception", base) { - genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"#{c.name}: \"+msg) {}" + genl "std::string getPrefix() const { return \"#{c.name}\"; }" + genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"\"+msg) {}" } end @@ -60,12 +61,14 @@ class ConstantsGen < CppGen def reply_exceptions_h() h_file("#{@dir}/reply_exceptions") { include "qpid/Exception" + include "qpid/ExceptionHolder" namespace(@namespace) { define_exceptions_for("execution", "error-code", "SessionException") define_exceptions_for("connection", "close-code", "ConnectionException") define_exceptions_for("session", "detach-code", "ChannelException") genl genl "void throwExecutionException(int code, const std::string& text);" + genl "void setExecutionException(ExceptionHolder& holder, int code, const std::string& text);" } } end @@ -74,14 +77,21 @@ class ConstantsGen < CppGen cpp_file("#{@dir}/reply_exceptions") { include "#{@dir}/reply_exceptions" include "<sstream>" + include "<assert.h>" namespace("qpid::framing") { scope("void throwExecutionException(int code, const std::string& text) {"){ + genl "ExceptionHolder h;" + genl "setExecutionException(h, code, text);" + genl "h.raise();" + } + scope("void setExecutionException(ExceptionHolder& holder, int code, const std::string& text) {"){ scope("switch (code) {") { enum = @amqp.class_("execution").domain("error-code").enum enum.choices.each { |c| - genl "case #{c.value}: throw #{c.name.caps}Exception(text);" + genl "case #{c.value}: holder = new #{c.name.caps}Exception(text); break;" } - genl "default: break;" + genl 'default: assert(0);' + genl ' holder = new InvalidArgumentException(QPID_MSG("Bad exception code: " << code << ": " << text));' } } } diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 5c052b0fe3..4a49c83b65 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -46,7 +46,7 @@ DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h ## Compiler flags -AM_CXXFLAGS = $(WARNING_CFLAGS) $(CFLAGS) +AM_CXXFLAGS = $(WARNING_CFLAGS) AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) INCLUDES = -Igen -I$(srcdir)/gen @@ -170,9 +170,13 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(rgen_framing_srcs) \ $(platform_src) \ + qpid/amqp_0_10/SessionHandler.h \ + qpid/amqp_0_10/SessionHandler.cpp \ qpid/Serializer.h \ - qpid/SessionState.cpp \ qpid/SessionState.h \ + qpid/SessionState.cpp \ + qpid/SessionId.h \ + qpid/SessionId.cpp \ qpid/framing/AccumulatedAck.cpp \ qpid/framing/AMQBody.cpp \ qpid/framing/AMQMethodBody.cpp \ @@ -330,6 +334,7 @@ nobase_include_HEADERS = \ qpid/assert.h \ qpid/DataDir.h \ qpid/Exception.h \ + qpid/ExceptionHolder.h \ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index a69955c9dc..8176d92cac 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -34,13 +34,15 @@ std::string strError(int err) { } Exception::Exception(const std::string& msg) throw() : message(msg) { - QPID_LOG(debug, "Exception thrown: " << message); + QPID_LOG(debug, "Exception: " << message); } Exception::~Exception() throw() {} std::string Exception::getPrefix() const { return "Exception"; } +std::string Exception::getMessage() const { return message; } + const char* Exception::what() const throw() { if (whatStr.empty()) whatStr = getPrefix() + ": " + message; diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index e74fa79ed9..1be433f17a 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -43,10 +43,10 @@ class Exception : public std::exception public: explicit Exception(const std::string& message=std::string()) throw(); virtual ~Exception() throw(); - virtual const char* what() const throw(); + virtual const char* what() const throw(); // prefix: message + virtual std::string getMessage() const; // Unprefixed message + virtual std::string getPrefix() const; // Prefix - protected: - std::string getPrefix() const; private: std::string message; mutable std::string whatStr; diff --git a/cpp/src/qpid/ExceptionHolder.h b/cpp/src/qpid/ExceptionHolder.h new file mode 100644 index 0000000000..fed6308f19 --- /dev/null +++ b/cpp/src/qpid/ExceptionHolder.h @@ -0,0 +1,73 @@ +#ifndef QPID_EXCEPTIONHOLDER_H +#define QPID_EXCEPTIONHOLDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/memory.h" +#include <memory> + +namespace qpid { + +struct Raisable { + virtual ~Raisable() {}; + virtual void raise() const=0; + virtual std::string what() const=0; +}; + +/** + * Holder for exceptions. Allows the thread that notices an error condition to + * create an exception and store it to be thrown by another thread. + */ +class ExceptionHolder : public Raisable { + public: + ExceptionHolder() {} + ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {} + /** Take ownership of ex */ + template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); } + template <class Ex> ExceptionHolder(const std::auto_ptr<Ex>& ex) { wrap(ex.release()); } + + ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; } + template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; } + template <class Ex> ExceptionHolder& operator=(std::auto_ptr<Ex> ex) { wrap(ex.release()); return *this; } + + void raise() const { if (wrapper.get()) wrapper->raise() ; } + std::string what() const { return wrapper->what(); } + bool empty() const { return !wrapper.get(); } + operator bool() const { return !empty(); } + void reset() { wrapper.reset(); } + + private: + template <class Ex> struct Wrapper : public Raisable { + Wrapper(Ex* ptr) : exception(ptr) {} + void raise() const { throw *exception; } + std::string what() const { return exception->what(); } + std::auto_ptr<Ex> exception; + }; + template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); } + std::auto_ptr<Raisable> wrapper; + +}; + + +} // namespace qpid + +#endif /*!QPID_EXCEPTIONHOLDER_H*/ diff --git a/cpp/src/qpid/SessionId.cpp b/cpp/src/qpid/SessionId.cpp new file mode 100644 index 0000000000..fce6619f5d --- /dev/null +++ b/cpp/src/qpid/SessionId.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionId.h" +#include <sstream> + +namespace qpid { + +SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} + +bool SessionId::operator<(const SessionId& id) const { + return userId < id.userId || (userId == id.userId && name < id.name); +} + +bool SessionId::operator==(const SessionId& id) const { + return id.name == name && id.userId == userId; +} + +std::ostream& operator<<(std::ostream& o, const SessionId& id) { + return o << id.getName() << "@" << id.getUserId(); +} + +std::string SessionId::str() const { + std::ostringstream o; + o << *this; + return o.str(); +} + +} // namespace qpid diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h new file mode 100644 index 0000000000..08553e8b1d --- /dev/null +++ b/cpp/src/qpid/SessionId.h @@ -0,0 +1,49 @@ +#ifndef QPID_SESSIONID_H +#define QPID_SESSIONID_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/operators.hpp> +#include <string> + +namespace qpid { + +/** Identifier for a session */ +class SessionId : boost::totally_ordered1<SessionId> { + std::string userId; + std::string name; + public: + SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); + std::string getUserId() const { return userId; } + std::string getName() const { return name; } + bool operator<(const SessionId&) const ; + bool operator==(const SessionId& id) const; + // Convert to a string + std::string str() const; +}; + +std::ostream& operator<<(std::ostream&, const SessionId&); + + +} // namespace qpid + +#endif /*!QPID_SESSIONID_H*/ diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 64fdd17b8f..8905fb5f9d 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -19,21 +19,24 @@ * */ -// FIXME aconway 2008-04-24: Reminders for handler implementation. -// -// - execution.sync results must be communicated to SessionState::peerConfirmed. -// -// - #include "SessionState.h" #include "qpid/amqp_0_10/exceptions.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <numeric> namespace qpid { using framing::AMQFrame; using amqp_0_10::NotImplementedException; +using amqp_0_10::InvalidArgumentException; +using amqp_0_10::IllegalStateException; + +namespace { +bool isControl(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->type() == 0; +} +} // namespace /** A point in the session - command id + offset */ void SessionPoint::advance(const AMQFrame& f) { @@ -60,103 +63,123 @@ bool SessionPoint::operator==(const SessionPoint& x) const { return command == x.command && offset == x.offset; } -SendState::SendState(size_t syncSize, size_t killSize) - : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {} +SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {} + +const SessionPoint& SessionState::SendState::getCommandPoint() { + return sendPoint; +} + +bool SessionState::SendState::expected(const SessionPoint& point) { + if (point < replayPoint || sendPoint < point) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range.")); + // FIXME aconway 2008-05-06: this is not strictly correct, we should keep + // an intermediate replay pointer into the replay list. + confirmed(point); // Drop commands prior to expected from replay. + return (!replayList.empty()); +} -void SendState::send(const AMQFrame& f) { - if (f.getMethod() && f.getMethod()->type() == 0) - return; // Don't replay control frames. +void SessionState::SendState::record(const AMQFrame& f) { + if (isControl(f)) return; // Ignore control frames. + session->stateful = true; replayList.push_back(f); unflushedSize += f.size(); + incomplete += sendPoint.command; sendPoint.advance(f); } -bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; } +bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; } -void SendState::sendFlush() { +void SessionState::SendState::recordFlush() { assert(flushPoint <= sendPoint); flushPoint = sendPoint; unflushedSize = 0; } -void SendState::peerConfirmed(const SessionPoint& confirmed) { +void SessionState::SendState::confirmed(const SessionPoint& confirmed) { + if (confirmed > sendPoint) + throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent.")); ReplayList::iterator i = replayList.begin(); - // Ignore peerConfirmed.offset, we don't support partial replay. while (i != replayList.end() && replayPoint.command < confirmed.command) { - assert(replayPoint <= flushPoint); replayPoint.advance(*i); assert(replayPoint <= sendPoint); - if (replayPoint > flushPoint) { - flushPoint.advance(*i); - assert(replayPoint <= flushPoint); + if (replayPoint > flushPoint) unflushedSize -= i->size(); - } ++i; } + if (replayPoint > flushPoint) flushPoint = replayPoint; replayList.erase(replayList.begin(), i); assert(replayPoint.offset == 0); } -void SendState::peerCompleted(const SequenceSet& commands) { +void SessionState::SendState::completed(const SequenceSet& commands) { if (commands.empty()) return; - sentCompleted += commands; + incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order // confirmation, so confirm only the first contiguous range of commands. - peerConfirmed(SessionPoint(commands.rangesBegin()->end())); + confirmed(SessionPoint(commands.rangesBegin()->end())); } -bool ReceiveState::hasState() { return stateful; } +SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {} -void ReceiveState::setExpecting(const SessionPoint& point) { - if (!hasState()) // initializing a new session. - expecting = received = point; - else { // setting point in an existing session. - if (point > received) - throw NotImplementedException("command-point out of bounds."); - expecting = point; - } +void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) { + if (session->hasState() && point > received) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range.")); + expected = point; + if (expected > received) + received = expected; } -ReceiveState::ReceiveState() : stateful() {} - -bool ReceiveState::receive(const AMQFrame& f) { - stateful = true; - expecting.advance(f); - if (expecting > received) { - received = expecting; +bool SessionState::ReceiveState::record(const AMQFrame& f) { + if (isControl(f)) return true; // Ignore control frames. + session->stateful = true; + expected.advance(f); + if (expected > received) { + received = expected; return true; } + else { + QPID_LOG(debug, "Ignoring duplicate: " << f); return false; } - -void ReceiveState::localCompleted(SequenceNumber command) { - assert(command < received.command); // Can't complete what we haven't received. - receivedCompleted += command; } -void ReceiveState::peerKnownComplete(const SequenceSet& commands) { - receivedCompleted -= commands; +void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) { + assert(command <= received.command); // Internal error to complete an unreceived command. + assert(firstIncomplete <= command); + if (cumulative) + unknownCompleted.add(firstIncomplete, command); + else + unknownCompleted += command; + firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end(); } -SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} - -bool SessionId::operator<(const SessionId& id) const { - return userId < id.userId || (userId == id.userId && name < id.name); +void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) { + if (!commands.empty() && commands.back() > received.command) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands.")); + unknownCompleted -= commands; } -bool SessionId::operator==(const SessionId& id) const { - return id.name == name && id.userId == userId; +SequenceNumber SessionState::ReceiveState::getCurrent() const { + SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic. + return --current; } +// FIXME aconway 2008-05-02: implement sync & kill limits. SessionState::Configuration::Configuration() : replaySyncSize(std::numeric_limits<size_t>::max()), replayKillSize(std::numeric_limits<size_t>::max()) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : SendState(c.replaySyncSize, c.replayKillSize), - id(i), timeout(), config(c) {} + : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful() +{ + QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); +} + +bool SessionState::hasState() const { + return stateful; +} -void SessionState::clear() { *this = SessionState(id, config); } +SessionState::~SessionState() {} std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { return o << "(" << p.command.getValue() << "+" << p.offset << ")"; diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index b836534ee7..7957825dd3 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -22,9 +22,11 @@ * */ +#include <qpid/SessionId.h> #include <qpid/framing/SequenceNumber.h> #include <qpid/framing/SequenceSet.h> #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FrameHandler.h> #include <boost/operators.hpp> #include <vector> #include <iosfwd> @@ -49,118 +51,124 @@ struct SessionPoint : boost::totally_ordered1<SessionPoint> { std::ostream& operator<<(std::ostream&, const SessionPoint&); -/** The sending half of session state */ +/** + * Support for session idempotence barrier and resume as defined in + * AMQP 0-10. + * + * We only issue/use contiguous confirmations, out-of-order confirmation + * is ignored. Out of order completion is fully supported. + * + * Raises NotImplemented if the command point is set greater than the + * max currently received command data, either explicitly via + * session.command-point or implicitly via session.gap. + * + * Partial replay is not supported, replay always begins on a command + * boundary, and we never confirm partial commands. + * + * The SessionPoint data structure does store offsets so this class + * could be extended to support partial replay without + * source-incompatbile API changes. + */ +class SessionState { + public: + + /** State for commands sent. Records commands for replay, + * tracks confirmation and completion of sent commands. + */ class SendState { public: typedef std::vector<framing::AMQFrame> ReplayList; /** Record frame f for replay. Should not be called during replay. */ - void send(const framing::AMQFrame& f); + void record(const framing::AMQFrame& f); /** @return true if we should send flush for confirmed and completed commands. */ bool needFlush() const; /** Called when flush for confirmed and completed commands is sent to peer. */ - void sendFlush(); + void recordFlush(); - /** Called when the peer confirms up to commands. */ - void peerConfirmed(const SessionPoint& confirmed); + /** Called when the peer confirms up to comfirmed. */ + void confirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ - void peerCompleted(const SequenceSet& commands); + void completed(const SequenceSet& commands); - /** Get the replay list. @see getReplayPoint. */ - const ReplayList& getReplayList() const { return replayList; } - - /** - * The replay point is the point up to which all data has been - * confirmed. Partial replay is not supported, it will always - * have offset==0. - */ + /** Point from which we can replay. All data < replayPoint is confirmed. */ const SessionPoint& getReplayPoint() const { return replayPoint; } - const SessionPoint& getSendPoint() const { return sendPoint; } - const SequenceSet& getCompleted() const { return sentCompleted; } + /** Get the replay list, starting from getReplayPoint() */ + // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&. + ReplayList& getReplayList() { return replayList; } - protected: - SendState(size_t replaySyncSize, size_t replayKillSize); + /** Point from which the next data will be sent. */ + const SessionPoint& getCommandPoint(); + + /** Set of outstanding incomplete commands */ + const SequenceSet& getIncomplete() const { return incomplete; } + + /** Peer expecting commands from this point. + *@return true if replay is required, sets replayPoint. + */ + bool expected(const SessionPoint& expected); private: - size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration. + SendState(SessionState& s); + + SessionState* session; // invariant: replayPoint <= flushPoint <= sendPoint SessionPoint replayPoint; // Can replay from this point - SessionPoint sendPoint; // Send from this point SessionPoint flushPoint; // Point of last flush + SessionPoint sendPoint; // Send from this point ReplayList replayList; // Starts from replayPoint. size_t unflushedSize; // Un-flushed bytes in replay list. - SequenceSet sentCompleted; // Commands sent and acknowledged as completed. + SequenceSet incomplete; // Commands sent and not yet completed. + + friend class SessionState; }; -/** Receiving half of SessionState */ + /** State for commands received. + * Idempotence barrier for duplicate commands, tracks completion + * and of received commands. + */ class ReceiveState { public: - bool hasState(); - /** Set the command point. */ - void setExpecting(const SessionPoint& point); + void setCommandPoint(const SessionPoint& point); /** Returns true if frame should be be processed, false if it is a duplicate. */ - bool receive(const framing::AMQFrame& f); + bool record(const framing::AMQFrame& f); /** Command completed locally */ - void localCompleted(SequenceNumber command); + void completed(SequenceNumber command, bool cumulative=false); /** Peer has indicated commands are known completed */ - void peerKnownComplete(const SequenceSet& commands); + void knownCompleted(const SequenceSet& commands); + + /** Get the incoming command point */ + const SessionPoint& getExpected() const { return expected; } - /** Recieved, completed and possibly not known by peer to be completed */ - const SequenceSet& getReceivedCompleted() const { return receivedCompleted; } - const SessionPoint& getExpecting() const { return expecting; } + /** Get the received high-water-mark, may be > getExpected() during replay */ const SessionPoint& getReceived() const { return received; } - protected: - ReceiveState(); + /** Completed commands that the peer may not know about */ + const SequenceSet& getUnknownComplete() const { return unknownCompleted; } + + /** ID of the command currently being handled. */ + SequenceNumber getCurrent() const; private: - bool stateful; // True if session has state. - SessionPoint expecting; // Expecting from here - SessionPoint received; // Received to here. Invariant: expecting <= received. - SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer -}; + ReceiveState(SessionState&); -/** Identifier for a session */ -class SessionId : boost::totally_ordered1<SessionId> { - std::string userId; - std::string name; - public: - SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); - std::string getUserId() const { return userId; } - std::string getName() const { return name; } - bool operator<(const SessionId&) const ; - bool operator==(const SessionId& id) const; -}; + SessionState* session; + SessionPoint expected; // Expected from here + SessionPoint received; // Received to here. Invariant: expected <= received. + SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. + SequenceNumber firstIncomplete; // First incomplete command. + friend class SessionState; + }; -/** - * Support for session idempotence barrier and resume as defined in - * AMQP 0-10. - * - * We only issue/use contiguous confirmations, out-of-order confirmation - * is ignored. Out of order completion is fully supported. - * - * Raises NotImplemented if the command point is set greater than the - * max currently received command data, either explicitly via - * session.command-point or implicitly via session.gap. - * - * Partial replay is not supported, replay always begins on a command - * boundary, and we never confirm partial commands. - * - * The SessionPoint data structure does store offsets so this class - * could be extended to support partial replay without - * source-incompatbile API changes. - */ -class SessionState : public SendState, public ReceiveState { - public: struct Configuration { Configuration(); size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes @@ -169,19 +177,32 @@ class SessionState : public SendState, public ReceiveState { SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + virtual ~SessionState(); + const SessionId& getId() const { return id; } uint32_t getTimeout() const { return timeout; } void setTimeout(uint32_t seconds) { timeout = seconds; } - /** Clear all state except Id. */ - void clear(); + bool operator==(const SessionId& other) const { return id == other; } + bool operator==(const SessionState& other) const { return id == other.id; } + + SendState sender; ///< State for commands sent + ReceiveState receiver; ///< State for commands received + + bool hasState() const; private: SessionId id; uint32_t timeout; Configuration config; + bool stateful; + + friend class SendState; + friend class ReceiveState; }; +inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } + } // namespace qpid diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp new file mode 100644 index 0000000000..3fb2579e8c --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "SessionHandler.h" +#include "qpid/SessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/AllInvoker.h" +#include "qpid/log/Statement.h" + + +#include <boost/bind.hpp> + +namespace qpid { +namespace amqp_0_10 { +using namespace framing; +using namespace std; + +SessionHandler::SessionHandler() : peer(channel), ignoring(), sendReady(), receiveReady() {} + +SessionHandler::SessionHandler(FrameHandler& out, ChannelId ch) + : channel(ch, &out), peer(channel), ignoring(false) {} + +SessionHandler::~SessionHandler() {} + +namespace { +bool isSessionControl(AMQMethodBody* m) { + return m && + m->amqpClassId() == SESSION_CLASS_ID; +} +bool isSessionDetachedControl(AMQMethodBody* m) { + return isSessionControl(m) && + m->amqpMethodId() == SESSION_DETACHED_METHOD_ID; +} +} // namespace + +void SessionHandler::checkAttached() { + if (!getState()) + throw NotAttachedException( + QPID_MSG("Channel " << channel.get() << " is not attached")); + assert(getInHandler()); + assert(channel.next); +} + +void SessionHandler::invoke(const AMQMethodBody& m) { + framing::invoke(*this, m); +} + +void SessionHandler::handleIn(AMQFrame& f) { + // Note on channel states: a channel is attached if session != 0 + AMQMethodBody* m = f.getBody()->getMethod(); + try { + if (ignoring && !isSessionDetachedControl(m)) + return; + else if (isSessionControl(m)) + invoke(*m); + else { + checkAttached(); + if (!receiveReady) + throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); + if (!getState()->receiver.record(f)) + return; // Ignore duplicates. + getInHandler()->handle(f); + } + } + catch(const ChannelException& e){ + QPID_LOG(error, "Channel exception: " << e.what()); + if (getState()) + peer.detached(getState()->getId().getName(), e.code); + channelException(e.code, e.getMessage()); + } + catch(const ConnectionException& e) { + QPID_LOG(error, "Connection exception: " << e.what()); + connectionException(e.code, e.getMessage()); + } + catch(const std::exception& e) { + QPID_LOG(error, "Unexpected exception: " << e.what()); + connectionException(connection::FRAMING_ERROR, e.what()); + } +} + +void SessionHandler::handleOut(AMQFrame& f) { + checkAttached(); + if (!sendReady) + throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); + getState()->sender.record(f); + if (getState()->sender.needFlush()) { + peer.flush(false, true, true); + getState()->sender.recordFlush(); + } + channel.handle(f); +} + +void SessionHandler::checkName(const std::string& name) { + checkAttached(); + if (name != getState()->getId().getName()) + throw InvalidArgumentException( + QPID_MSG("Incorrect session name: " << name + << ", expecting: " << getState()->getId().getName())); +} + +void SessionHandler::attach(const std::string& name, bool force) { + if (getState() && name == getState()->getId().getName()) + return; // Idempotent + if (getState()) + throw SessionBusyException( + QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId())); + setState(name, force); + QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId()); + peer.attached(name); + if (getState()->hasState()) + peer.flush(true, true, true); + else + sendCommandPoint(); +} + +void SessionHandler::attached(const std::string& name) { + checkName(name); +} + +void SessionHandler::detach(const std::string& name) { + checkName(name); + peer.detached(name, session::NORMAL); + handleDetach(); +} + +void SessionHandler::detached(const std::string& name, uint8_t code) { + checkName(name); + ignoring = false; + if (code != session::NORMAL) + channelException(code, "session.detached from peer."); + else { + handleDetach(); + } +} + +void SessionHandler::handleDetach() { + sendReady = receiveReady = false; +} + +void SessionHandler::requestTimeout(uint32_t t) { + checkAttached(); + getState()->setTimeout(t); + peer.timeout(t); +} + +void SessionHandler::timeout(uint32_t t) { + checkAttached(); + getState()->setTimeout(t); +} + +void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) { + checkAttached(); + getState()->receiver.setCommandPoint(SessionPoint(id, offset)); + if (!receiveReady) { + receiveReady = true; + readyToReceive(); + } +} + +void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) { + checkAttached(); + if (commands.empty() && getState()->hasState()) + throw IllegalStateException( + QPID_MSG(getState()->getId() << ": has state but client is attaching as new session.")); + getState()->sender.expected(commands.empty() ? SequenceNumber(0) : commands.front()); + if (!sendReady) // send command point if not already sent + sendCommandPoint(); +} + +void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) { + checkAttached(); + // Ignore non-contiguous confirmations. + if (!commands.empty() && commands.front() >= getState()->sender.getReplayPoint()) { + getState()->sender.confirmed(commands.rangesBegin()->last()); + } +} + +void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) { + checkAttached(); + getState()->sender.completed(commands); + if (!commands.empty()) + peer.knownCompleted(commands); // Always send a timely reply +} + +void SessionHandler::knownCompleted(const SequenceSet& commands) { + checkAttached(); + getState()->receiver.knownCompleted(commands); +} + +void SessionHandler::flush(bool expected, bool confirmed, bool completed) { + checkAttached(); + if (expected) { + SequenceSet expectSet; + if (getState()->hasState()) + expectSet.add(getState()->receiver.getExpected().command); + peer.expected(expectSet, Array()); + } + if (confirmed) { + SequenceSet confirmSet; + if (!getState()->receiver.getUnknownComplete().empty()) + confirmSet.add(getState()->receiver.getUnknownComplete().front(), + getState()->receiver.getReceived().command); + peer.confirmed(confirmSet, Array()); + } + if (completed) + peer.completed(getState()->receiver.getUnknownComplete(), true); +} + +void SessionHandler::gap(const SequenceSet& /*commands*/) { + throw NotImplementedException("session.gap not supported"); +} + +void SessionHandler::sendDetach() +{ + checkAttached(); + ignoring = true; + peer.detach(getState()->getId().getName()); +} + +void SessionHandler::sendCompletion() { + checkAttached(); + peer.completed(getState()->receiver.getUnknownComplete(), true); +} + +void SessionHandler::sendAttach(bool force) { + checkAttached(); + peer.attach(getState()->getId().getName(), force); + if (getState()->hasState()) + peer.flush(true, true, true); + else + sendCommandPoint(); +} + +void SessionHandler::sendCommandPoint() { + SessionPoint point(getState()->sender.getCommandPoint()); + peer.commandPoint(point.command, point.offset); + if (!sendReady) { + sendReady = true; + readyToSend(); + } +} + +void SessionHandler::sendTimeout(uint32_t t) { + checkAttached(); + peer.requestTimeout(t); +} + +void SessionHandler::sendFlush() { + peer.flush(false, true, true); +} + +bool SessionHandler::ready() const { + return sendReady && receiveReady; +} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h new file mode 100644 index 0000000000..85577ebafc --- /dev/null +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -0,0 +1,116 @@ +#ifndef QPID_AMQP_0_10_SESSIONHANDLER_H +#define QPID_AMQP_0_10_SESSIONHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/AMQP_AllProxy.h" +#include "qpid/framing/AMQP_AllOperations.h" + +namespace qpid { + +class SessionState; + +namespace amqp_0_10 { + +/** + * Base SessionHandler with logic common to both client and broker. + * + * A SessionHandler is associated with a channel and can be attached + * to a session state. + */ + +class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, + public framing::FrameHandler::InOutHandler +{ + public: + typedef framing::AMQP_AllProxy::Session Peer; + + SessionHandler(); + SessionHandler(framing::FrameHandler& out, uint16_t channel); + ~SessionHandler(); + + void setChannel(uint16_t ch) { channel = ch; } + uint16_t getChannel() const { return channel.get(); } + + void setOutHandler(framing::FrameHandler& h) { channel.next = &h; } + + virtual SessionState* getState() = 0; + virtual framing::FrameHandler* getInHandler() = 0; + + // Non-protocol methods, called locally to initiate some action. + void sendDetach(); + void sendCompletion(); + void sendAttach(bool force); + void sendTimeout(uint32_t t); + void sendFlush(); + void sendCommandPoint(); + + /** True if the handler is ready to send and receive */ + bool ready() const; + + // Protocol methods + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t code); + + void requestTimeout(uint32_t t); + void timeout(uint32_t t); + + void commandPoint(const framing::SequenceNumber& id, uint64_t offset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + protected: + virtual void invoke(const framing::AMQMethodBody& m); + + virtual void setState(const std::string& sessionName, bool force) = 0; + virtual void channelException(uint16_t code, const std::string& msg) = 0; + virtual void connectionException(uint16_t code, const std::string& msg) = 0; + + + // Notification of events + virtual void readyToSend() {} + virtual void readyToReceive() {} + virtual void handleDetach(); + + virtual void handleIn(framing::AMQFrame&); + virtual void handleOut(framing::AMQFrame&); + + void checkAttached(); + void checkName(const std::string& name); + + framing::ChannelHandler channel; + Peer peer; + bool ignoring; + bool sendReady, receiveReady; + // FIXME aconway 2008-05-07: move handler-related functions from SessionState. + +}; +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_AMQP_0_10_SESSIONHANDLER_H*/ diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index fbf3c0b7ca..b93869be85 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -82,14 +82,14 @@ struct Handler { template <class X, void (X::*F)(T)> class MemFunRef : public Handler<T> { public: - MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(x) {} - void handle(T t) { (target.*F)(t); } + MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {} + void handle(T t) { (target->*F)(t); } /** Allow calling with -> syntax, compatible with Chains */ MemFunRef* operator->() { return this; } private: - X& target; + X* target; }; /** Interface for a handler that implements a diff --git a/cpp/src/qpid/framing/Proxy.cpp b/cpp/src/qpid/framing/Proxy.cpp index b47060028f..6b37fb368d 100644 --- a/cpp/src/qpid/framing/Proxy.cpp +++ b/cpp/src/qpid/framing/Proxy.cpp @@ -22,16 +22,21 @@ namespace qpid { namespace framing { +Proxy::Proxy(FrameHandler& h) : out(&h) {} + Proxy::~Proxy() {} void Proxy::send(const AMQBody& b) { AMQFrame f(b); - out.handle(f); + out->handle(f); } - ProtocolVersion Proxy::getVersion() const { return ProtocolVersion(); } +FrameHandler& Proxy::getHandler() { return *out; } + +void Proxy::setHandler(FrameHandler& f) { out=&f; } + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h index 86b99a83b0..3dc082097a 100644 --- a/cpp/src/qpid/framing/Proxy.h +++ b/cpp/src/qpid/framing/Proxy.h @@ -33,16 +33,18 @@ class AMQBody; class Proxy { public: - Proxy(FrameHandler& h) : out(h) {} + Proxy(FrameHandler& h); virtual ~Proxy(); void send(const AMQBody&); ProtocolVersion getVersion() const; - FrameHandler& getHandler() { return out; } - protected: - FrameHandler& out; + FrameHandler& getHandler(); + void setHandler(FrameHandler&); + + private: + FrameHandler* out; }; }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp index cdf890b7f8..9ba55b2fa8 100644 --- a/cpp/src/qpid/framing/SequenceSet.cpp +++ b/cpp/src/qpid/framing/SequenceSet.cpp @@ -84,7 +84,7 @@ void SequenceSet::remove(const SequenceNumber& s) { *this -= s; } struct RangePrinter { std::ostream& out; RangePrinter(std::ostream& o) : out(o) {} - void operator()(SequenceNumber i, SequenceNumber j) { + void operator()(SequenceNumber i, SequenceNumber j) const { out << "[" << i.getValue() << "," << j.getValue() << "] "; } }; diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h index 029a26818e..99e7cb4b21 100644 --- a/cpp/src/qpid/framing/SequenceSet.h +++ b/cpp/src/qpid/framing/SequenceSet.h @@ -34,6 +34,8 @@ class SequenceSet : public RangeSet<SequenceNumber> { explicit SequenceSet(const RangeSet<SequenceNumber>& r) : RangeSet<SequenceNumber>(r) {} explicit SequenceSet(const SequenceNumber& s) { add(s); } + SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish); } + void encode(Buffer& buffer) const; void decode(Buffer& buffer); @@ -41,17 +43,20 @@ class SequenceSet : public RangeSet<SequenceNumber> { bool contains(const SequenceNumber& s) const; void add(const SequenceNumber& s); - void add(const SequenceNumber& start, const SequenceNumber& end); + void add(const SequenceNumber& start, const SequenceNumber& finish); // Closed range void add(const SequenceSet& set); void remove(const SequenceNumber& s); - void remove(const SequenceNumber& start, const SequenceNumber& end); + void remove(const SequenceNumber& start, const SequenceNumber& finish); // Closed range void remove(const SequenceSet& set); - template <class T> T for_each(T& t) const { - for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) { + template <class T> void for_each(T& t) const { + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) t(i->first(), i->last()); } - return t; + + template <class T> void for_each(const T& t) const { + for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) + t(i->first(), i->last()); } friend std::ostream& operator<<(std::ostream&, const SequenceSet&); diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h index 4b3f704dda..1df62b3138 100644 --- a/cpp/src/qpid/framing/SessionState.h +++ b/cpp/src/qpid/framing/SessionState.h @@ -70,7 +70,7 @@ class SessionState SessionState(const framing::Uuid& id=framing::Uuid(true)); const framing::Uuid& getId() const { return id; } - State getState() const { return state; } + State getState() { return state; } /** Received incoming L3 frame. * @return SequenceNumber if an ack should be sent, empty otherwise. diff --git a/cpp/src/tests/.valgrindrc b/cpp/src/tests/.valgrindrc index 4aba7661de..76bac021d8 100644 --- a/cpp/src/tests/.valgrindrc +++ b/cpp/src/tests/.valgrindrc @@ -4,4 +4,5 @@ --suppressions=.valgrind.supp --num-callers=25 --trace-children=yes +--error-exitcode=1 diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp index 752d6d3e75..71b90ea9f1 100644 --- a/cpp/src/tests/SessionState.cpp +++ b/cpp/src/tests/SessionState.cpp @@ -18,7 +18,6 @@ #include "unit_test.h" -#include "qpid/framing/SessionState.h" // FIXME aconway 2008-04-23: preview code to remove. #include "qpid/SessionState.h" #include "qpid/Exception.h" #include "qpid/framing/MessageTransferBody.h" @@ -85,7 +84,7 @@ AMQFrame contentFrameChar(char content, bool isLast=true) { } // Send frame & return size of frame. -size_t send(qpid::SessionState& s, const AMQFrame& f) { s.send(f); return f.size(); } +size_t send(qpid::SessionState& s, const AMQFrame& f) { s.sender.record(f); return f.size(); } // Send transfer command with no content. size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); } // Send transfer frame with single content frame. @@ -127,13 +126,14 @@ using qpid::SessionPoint; QPID_AUTO_TEST_CASE(testSendGetReplyList) { qpid::SessionState s; + s.sender.getCommandPoint(); transfer1(s, "abc"); transfers(s, "def"); transferN(s, "xyz"); - BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz"); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz"); // Ignore controls. - s.send(AMQFrame(in_place<SessionFlushBody>())); - BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz"); + s.sender.record(AMQFrame(in_place<SessionFlushBody>())); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz"); } QPID_AUTO_TEST_CASE(testNeedFlush) { @@ -141,17 +141,18 @@ QPID_AUTO_TEST_CASE(testNeedFlush) { // sync after 2 1-byte transfers or equivalent bytes. c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); qpid::SessionState s(SessionId(), c); + s.sender.getCommandPoint(); transfers(s, "a"); - BOOST_CHECK(!s.needFlush()); + BOOST_CHECK(!s.sender.needFlush()); transfers(s, "b"); - BOOST_CHECK(s.needFlush()); - s.sendFlush(); - BOOST_CHECK(!s.needFlush()); + BOOST_CHECK(s.sender.needFlush()); + s.sender.recordFlush(); + BOOST_CHECK(!s.sender.needFlush()); transfers(s, "c"); - BOOST_CHECK(!s.needFlush()); + BOOST_CHECK(!s.sender.needFlush()); transfers(s, "d"); - BOOST_CHECK(s.needFlush()); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd"); + BOOST_CHECK(s.sender.needFlush()); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd"); } QPID_AUTO_TEST_CASE(testPeerConfirmed) { @@ -159,192 +160,103 @@ QPID_AUTO_TEST_CASE(testPeerConfirmed) { // sync after 2 1-byte transfers or equivalent bytes. c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize()); qpid::SessionState s(SessionId(), c); + s.sender.getCommandPoint(); transfers(s, "ab"); - BOOST_CHECK(s.needFlush()); + BOOST_CHECK(s.sender.needFlush()); transfers(s, "cd"); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd"); - s.peerConfirmed(SessionPoint(3)); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cd"); - BOOST_CHECK(!s.needFlush()); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd"); + s.sender.confirmed(SessionPoint(3)); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cd"); + BOOST_CHECK(!s.sender.needFlush()); // Never go backwards. - s.peerConfirmed(SessionPoint(2)); - s.peerConfirmed(SessionPoint(3)); + s.sender.confirmed(SessionPoint(2)); + s.sender.confirmed(SessionPoint(3)); // Multi-frame transfer. transfer1(s, "efg"); transfers(s, "xy"); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CdCefgCxCy"); - BOOST_CHECK(s.needFlush()); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CdCefgCxCy"); + BOOST_CHECK(s.sender.needFlush()); - s.peerConfirmed(SessionPoint(4)); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CefgCxCy"); - BOOST_CHECK(s.needFlush()); + s.sender.confirmed(SessionPoint(4)); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CefgCxCy"); + BOOST_CHECK(s.sender.needFlush()); - s.peerConfirmed(SessionPoint(5)); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CxCy"); - BOOST_CHECK(s.needFlush()); + s.sender.confirmed(SessionPoint(5)); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CxCy"); + BOOST_CHECK(s.sender.needFlush()); - s.peerConfirmed(SessionPoint(6)); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cy"); - BOOST_CHECK(!s.needFlush()); + s.sender.confirmed(SessionPoint(6)); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cy"); + BOOST_CHECK(!s.sender.needFlush()); } QPID_AUTO_TEST_CASE(testPeerCompleted) { qpid::SessionState s; + s.sender.getCommandPoint(); // Completion implies confirmation transfers(s, "abc"); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCc"); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCc"); SequenceSet set(SequenceSet() + 0 + 1); - s.peerCompleted(set); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cc"); + s.sender.completed(set); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cc"); transfers(s, "def"); // We dont do out-of-order confirmation, so this will only confirm up to 3: set = SequenceSet(SequenceSet() + 2 + 3 + 5); - s.peerCompleted(set); - BOOST_CHECK_EQUAL(str(s.getReplayList()), "CeCf"); + s.sender.completed(set); + BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CeCf"); } QPID_AUTO_TEST_CASE(testReceive) { - // Advance expecting/received correctly + // Advance expected/received correctly qpid::SessionState s; - BOOST_CHECK(!s.hasState()); - BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(0)); - BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(0)); + s.receiver.setCommandPoint(SessionPoint()); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(0)); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(0)); - BOOST_CHECK(s.receive(transferFrame(false))); - BOOST_CHECK(s.hasState()); - BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(1)); - BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(1)); + BOOST_CHECK(s.receiver.record(transferFrame(false))); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(1)); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(1)); - BOOST_CHECK(s.receive(transferFrame(true))); + BOOST_CHECK(s.receiver.record(transferFrame(true))); SessionPoint point = SessionPoint(1, transferFrameSize()); - BOOST_CHECK_EQUAL(s.getExpecting(), point); - BOOST_CHECK_EQUAL(s.getReceived(), point); - BOOST_CHECK(s.receive(contentFrame("", false))); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), point); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), point); + BOOST_CHECK(s.receiver.record(contentFrame("", false))); point.offset += contentFrameSize(0); - BOOST_CHECK_EQUAL(s.getExpecting(), point); - BOOST_CHECK_EQUAL(s.getReceived(), point); - BOOST_CHECK(s.receive(contentFrame("", true))); - BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2)); - BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2)); - - // Idempotence barrier, rewind expecting & receive some duplicates. - s.setExpecting(SessionPoint(1)); - BOOST_CHECK(!s.receive(transferFrame(false))); - BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2)); - BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2)); - BOOST_CHECK(s.receive(transferFrame(false))); - BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(3)); - BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(3)); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), point); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), point); + BOOST_CHECK(s.receiver.record(contentFrame("", true))); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2)); + + // Idempotence barrier, rewind expected & receive some duplicates. + s.receiver.setCommandPoint(SessionPoint(1)); + BOOST_CHECK(!s.receiver.record(transferFrame(false))); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2)); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2)); + BOOST_CHECK(s.receiver.record(transferFrame(false))); + BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(3)); + BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(3)); } QPID_AUTO_TEST_CASE(testCompleted) { // completed & unknownCompleted qpid::SessionState s; - s.receive(transferFrame(false)); - s.receive(transferFrame(false)); - s.receive(transferFrame(false)); - s.localCompleted(1); - BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+1)); - s.localCompleted(0); - BOOST_CHECK_EQUAL(s.getReceivedCompleted(), + s.receiver.setCommandPoint(SessionPoint()); + s.receiver.record(transferFrame(false)); + s.receiver.record(transferFrame(false)); + s.receiver.record(transferFrame(false)); + s.receiver.completed(1); + BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+1)); + s.receiver.completed(0); + BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet() + SequenceSet::Range(0,2))); - s.peerKnownComplete(SequenceSet(SequenceSet()+1)); - BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+2)); -} - -// ================================================================ -// FIXME aconway 2008-04-23: Below here is old preview framing::SessionState test, remove with preview code. - -using namespace qpid::framing; - -// Sent chars as frames -void sent(SessionState& session, const std::string& frames) { - for_each(frames.begin(), frames.end(), - bind(&SessionState::sent, ref(session), bind(frame, _1))); -} - -// Received chars as frames -void received(SessionState& session, const std::string& frames) { - for_each(frames.begin(), frames.end(), - bind(&SessionState::received, ref(session), bind(frame, _1))); -} - -bool operator==(const AMQFrame& a, const AMQFrame& b) { - const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody()); - const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody()); - return ab && bb && ab->getData() == bb->getData(); -} - -QPID_AUTO_TEST_CASE(testSent) { - // Test that we send solicit-ack at the right interval. - AMQContentBody f; - SessionState s1(1); - BOOST_CHECK(s1.sent(f)); - BOOST_CHECK(s1.sent(f)); - BOOST_CHECK(s1.sent(f)); - - SessionState s3(3); - BOOST_CHECK(!s3.sent(f)); - BOOST_CHECK(!s3.sent(f)); - BOOST_CHECK(s3.sent(f)); - - BOOST_CHECK(!s3.sent(f)); - BOOST_CHECK(!s3.sent(f)); - s3.receivedAck(4); - BOOST_CHECK(!s3.sent(f)); - BOOST_CHECK(!s3.sent(f)); - BOOST_CHECK(s3.sent(f)); -} - -QPID_AUTO_TEST_CASE(testReplay) { - // Replay of all frames. - SessionState session(100); - sent(session, "abc"); - session.suspend(); session.resuming(); - session.receivedAck(-1); - BOOST_CHECK_EQUAL(str(session.replay()), "abc"); - - // Replay with acks - session.receivedAck(0); // ack a. - session.suspend(); - session.resuming(); - session.receivedAck(1); // ack b. - BOOST_CHECK_EQUAL(str(session.replay()), "c"); - - // Replay after further frames. - sent(session, "def"); - session.suspend(); - session.resuming(); - session.receivedAck(3); - BOOST_CHECK_EQUAL(str(session.replay()), "ef"); - - // Bad ack, too high - try { - session.receivedAck(6); - BOOST_FAIL("expected exception"); - } catch(const std::exception&) {} - -} - -QPID_AUTO_TEST_CASE(testReceived) { - // Check that we request acks at the right interval. - AMQContentBody f; - SessionState s1(1); - BOOST_CHECK_EQUAL(0u, *s1.received(f)); - BOOST_CHECK_EQUAL(1u, *s1.received(f)); - BOOST_CHECK_EQUAL(2u, *s1.received(f)); - - SessionState s3(3); - BOOST_CHECK(!s3.received(f)); - BOOST_CHECK(!s3.received(f)); - BOOST_CHECK_EQUAL(2u, *s3.received(f)); - - BOOST_CHECK(!s3.received(f)); - BOOST_CHECK(!s3.received(f)); - BOOST_CHECK_EQUAL(5u, *s3.received(f)); + s.receiver.knownCompleted(SequenceSet(SequenceSet()+1)); + BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+2)); + // TODO aconway 2008-04-30: missing tests for known-completed. } QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h index 3263652fe2..b53387bd57 100644 --- a/cpp/src/tests/SocketProxy.h +++ b/cpp/src/tests/SocketProxy.h @@ -42,7 +42,7 @@ class SocketProxy : private qpid::sys::Runnable * Listen for connection on getPort(). */ SocketProxy(int connectPort, const std::string host="localhost") - : closed(false), port(listener.listen()) + : closed(false), port(listener.listen()), dropClient(), dropServer() { client.connect(host, connectPort); thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); @@ -58,10 +58,17 @@ class SocketProxy : private qpid::sys::Runnable closed=true; } poller.shutdown(); + if (thread.id() != qpid::sys::Thread::current().id()) thread.join(); client.close(); } + /** Simulate lost packets, drop data from client */ + void dropClientData(bool drop=true) { dropClient=drop; } + + /** Simulate lost packets, drop data from server */ + void dropServerData(bool drop=true) { dropServer=drop; } + bool isClosed() const { qpid::sys::Mutex::ScopedLock l(lock); return closed; @@ -83,8 +90,8 @@ class SocketProxy : private qpid::sys::Runnable qpid::sys::PollerHandle listenerHandle(listener); poller.addFd(listenerHandle, qpid::sys::Poller::IN); qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); - throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed"); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); + throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); poller.delFd(listenerHandle); server.reset(listener.accept(0, 0)); @@ -97,25 +104,32 @@ class SocketProxy : private qpid::sys::Runnable char buffer[1024]; for (;;) { qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); - throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected"); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); + throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected"); if (event.handle == &serverHandle) { - client.write(buffer, server->read(buffer, sizeof(buffer))); + ssize_t n = server->read(buffer, sizeof(buffer)); + if (!dropServer) client.write(buffer, n); poller.rearmFd(serverHandle); } else if (event.handle == &clientHandle) { - server->write(buffer, client.read(buffer, sizeof(buffer))); + ssize_t n = client.read(buffer, sizeof(buffer)); + if (!dropClient) server->write(buffer, n); poller.rearmFd(clientHandle); } else { - throwIf(true, "No handle ready"); + throwIf(true, "SocketProxy: No handle ready"); } } } catch (const std::exception& e) { - QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + QPID_LOG(debug, "SocketProxy::run exception: " << e.what()); } + try { if (server.get()) server->close(); close(); } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); + } + } mutable qpid::sys::Mutex lock; bool closed; @@ -123,6 +137,7 @@ class SocketProxy : private qpid::sys::Runnable qpid::sys::Socket client, listener; uint16_t port; qpid::sys::Thread thread; + bool dropClient, dropServer; }; #endif |