summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/framing.0-10/Operations.rb13
-rwxr-xr-xcpp/rubygen/framing.0-10/OperationsInvoker.rb8
-rwxr-xr-xcpp/rubygen/framing.0-10/Proxy.rb11
-rwxr-xr-xcpp/rubygen/framing.0-10/constants.rb16
-rw-r--r--cpp/src/Makefile.am9
-rw-r--r--cpp/src/qpid/Exception.cpp4
-rw-r--r--cpp/src/qpid/Exception.h6
-rw-r--r--cpp/src/qpid/ExceptionHolder.h73
-rw-r--r--cpp/src/qpid/SessionId.cpp47
-rw-r--r--cpp/src/qpid/SessionId.h49
-rw-r--r--cpp/src/qpid/SessionState.cpp131
-rw-r--r--cpp/src/qpid/SessionState.h163
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp275
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.h116
-rw-r--r--cpp/src/qpid/framing/Handler.h6
-rw-r--r--cpp/src/qpid/framing/Proxy.cpp9
-rw-r--r--cpp/src/qpid/framing/Proxy.h10
-rw-r--r--cpp/src/qpid/framing/SequenceSet.cpp2
-rw-r--r--cpp/src/qpid/framing/SequenceSet.h15
-rw-r--r--cpp/src/qpid/framing/SessionState.h2
-rw-r--r--cpp/src/tests/.valgrindrc1
-rw-r--r--cpp/src/tests/SessionState.cpp236
-rw-r--r--cpp/src/tests/SocketProxy.h33
23 files changed, 907 insertions, 328 deletions
diff --git a/cpp/rubygen/framing.0-10/Operations.rb b/cpp/rubygen/framing.0-10/Operations.rb
index a22a591f14..4a67df8b92 100755
--- a/cpp/rubygen/framing.0-10/Operations.rb
+++ b/cpp/rubygen/framing.0-10/Operations.rb
@@ -24,8 +24,13 @@ class OperationsGen < CppGen
def handler_classname(c) c.name.caps+"Handler"; end
+ def methods_on(parent, chassis)
+ chassis == "all" ? parent.methods_ : parent.methods_on(chassis)
+ end
+
def handler_class(c)
- if (!c.methods_on(@chassis).empty?)
+ m = methods_on(c,@chassis)
+ if (not m.empty?)
handlerclass=handler_classname c
gen <<EOS
// ==================== class #{handlerclass} ====================
@@ -38,7 +43,7 @@ class #{handlerclass} {
virtual ~#{handlerclass}() {}
// Protocol methods
EOS
- c.methods_on(@chassis).each { |m| handler_method(m) if !m.content() }
+ m.each { |m| handler_method(m) if !m.content() }
gen <<EOS
}; // class #{handlerclass}
@@ -48,7 +53,8 @@ EOS
end
def handler_get(c)
- if (!c.methods_on(@chassis).empty?)
+ m = methods_on(c,@chassis)
+ if (not m.empty?)
handlerclass=handler_classname c
gen "virtual #{handlerclass}* get#{handlerclass}() = 0;\n"
end
@@ -93,4 +99,5 @@ end
OperationsGen.new("client",ARGV[0], $amqp).generate()
OperationsGen.new("server",ARGV[0], $amqp).generate()
+OperationsGen.new("all",ARGV[0], $amqp).generate()
diff --git a/cpp/rubygen/framing.0-10/OperationsInvoker.rb b/cpp/rubygen/framing.0-10/OperationsInvoker.rb
index 642f98ce8e..44006207ca 100755
--- a/cpp/rubygen/framing.0-10/OperationsInvoker.rb
+++ b/cpp/rubygen/framing.0-10/OperationsInvoker.rb
@@ -13,10 +13,15 @@ class OperationsInvokerGen < CppGen
@filename="qpid/framing/#{@chassis.caps}Invoker"
end
+ def methods_on(parent, chassis)
+ chassis == "all" ? parent.methods_ : parent.methods_on(chassis)
+ end
+
def handler(c) "#{@ops}::#{c.cppname}Handler"; end
def getter(c) "get#{c.cppname}Handler"; end
def invoker(c) "#{handler(c)}::Invoker"; end
- def visit_methods(c) c.methods_on(@chassis).select { |m| !m.content } end
+ def visit_methods(c) methods_on(c, @chassis).select { |m| !m.content } end
+
def handler_visits_cpp(c)
visit_methods(c).each { |m|
@@ -90,3 +95,4 @@ end
OperationsInvokerGen.new("client",ARGV[0], $amqp).generate()
OperationsInvokerGen.new("server",ARGV[0], $amqp).generate()
+OperationsInvokerGen.new("all",ARGV[0], $amqp).generate()
diff --git a/cpp/rubygen/framing.0-10/Proxy.rb b/cpp/rubygen/framing.0-10/Proxy.rb
index 87d809d4ad..71a6b954c6 100755
--- a/cpp/rubygen/framing.0-10/Proxy.rb
+++ b/cpp/rubygen/framing.0-10/Proxy.rb
@@ -11,6 +11,10 @@ class ProxyGen < CppGen
@filename="qpid/framing/#{@classname}"
end
+ def methods_on(parent, chassis)
+ chassis == "all" ? parent.methods_ : parent.methods_on(chassis)
+ end
+
def proxy_member(c) c.name.lcaps+"Proxy"; end
def inner_class_decl(c)
@@ -21,7 +25,7 @@ public:
#{cname}(FrameHandler& f) : Proxy(f) {}
static #{cname}& get(#{@classname}& proxy) { return proxy.get#{cname}(); }
EOS
- c.methods_on(@chassis).each { |m|
+ methods_on(c, @chassis).each { |m|
genl "virtual void #{m.cppname}(#{m.signature.join(",\n ")});"
genl
}}
@@ -29,7 +33,7 @@ EOS
def inner_class_defn(c)
cname=c.cppname
- c.methods_on(@chassis).each { |m|
+ methods_on(c, @chassis).each { |m|
genl "void #{@classname}::#{cname}::#{m.cppname}(#{m.signature.join(", ")})"
scope {
params=(["getVersion()"]+m.param_names).join(", ")
@@ -64,7 +68,7 @@ EOS
include "<sstream>"
include "#{@classname}.h"
include "qpid/framing/amqp_types_full.h"
- @amqp.methods_on(@chassis).each {
+ methods_on(@amqp, @chassis).each {
|m| include "qpid/framing/"+m.body_name
}
genl
@@ -81,4 +85,5 @@ end
ProxyGen.new("client", $outdir, $amqp).generate;
ProxyGen.new("server", $outdir, $amqp).generate;
+ProxyGen.new("all", $outdir, $amqp).generate;
diff --git a/cpp/rubygen/framing.0-10/constants.rb b/cpp/rubygen/framing.0-10/constants.rb
index 35067a733c..752f50b6e9 100755
--- a/cpp/rubygen/framing.0-10/constants.rb
+++ b/cpp/rubygen/framing.0-10/constants.rb
@@ -48,7 +48,8 @@ class ConstantsGen < CppGen
genl
doxygen_comment { genl c.doc }
struct(c.name.caps+"Exception", base) {
- genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"#{c.name}: \"+msg) {}"
+ genl "std::string getPrefix() const { return \"#{c.name}\"; }"
+ genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"\"+msg) {}"
}
end
@@ -60,12 +61,14 @@ class ConstantsGen < CppGen
def reply_exceptions_h()
h_file("#{@dir}/reply_exceptions") {
include "qpid/Exception"
+ include "qpid/ExceptionHolder"
namespace(@namespace) {
define_exceptions_for("execution", "error-code", "SessionException")
define_exceptions_for("connection", "close-code", "ConnectionException")
define_exceptions_for("session", "detach-code", "ChannelException")
genl
genl "void throwExecutionException(int code, const std::string& text);"
+ genl "void setExecutionException(ExceptionHolder& holder, int code, const std::string& text);"
}
}
end
@@ -74,14 +77,21 @@ class ConstantsGen < CppGen
cpp_file("#{@dir}/reply_exceptions") {
include "#{@dir}/reply_exceptions"
include "<sstream>"
+ include "<assert.h>"
namespace("qpid::framing") {
scope("void throwExecutionException(int code, const std::string& text) {"){
+ genl "ExceptionHolder h;"
+ genl "setExecutionException(h, code, text);"
+ genl "h.raise();"
+ }
+ scope("void setExecutionException(ExceptionHolder& holder, int code, const std::string& text) {"){
scope("switch (code) {") {
enum = @amqp.class_("execution").domain("error-code").enum
enum.choices.each { |c|
- genl "case #{c.value}: throw #{c.name.caps}Exception(text);"
+ genl "case #{c.value}: holder = new #{c.name.caps}Exception(text); break;"
}
- genl "default: break;"
+ genl 'default: assert(0);'
+ genl ' holder = new InvalidArgumentException(QPID_MSG("Bad exception code: " << code << ": " << text));'
}
}
}
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 5c052b0fe3..4a49c83b65 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -46,7 +46,7 @@ DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h
## Compiler flags
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(CFLAGS)
+AM_CXXFLAGS = $(WARNING_CFLAGS)
AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
INCLUDES = -Igen -I$(srcdir)/gen
@@ -170,9 +170,13 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(rgen_framing_srcs) \
$(platform_src) \
+ qpid/amqp_0_10/SessionHandler.h \
+ qpid/amqp_0_10/SessionHandler.cpp \
qpid/Serializer.h \
- qpid/SessionState.cpp \
qpid/SessionState.h \
+ qpid/SessionState.cpp \
+ qpid/SessionId.h \
+ qpid/SessionId.cpp \
qpid/framing/AccumulatedAck.cpp \
qpid/framing/AMQBody.cpp \
qpid/framing/AMQMethodBody.cpp \
@@ -330,6 +334,7 @@ nobase_include_HEADERS = \
qpid/assert.h \
qpid/DataDir.h \
qpid/Exception.h \
+ qpid/ExceptionHolder.h \
qpid/amqp_0_10/Exception.h \
qpid/Msg.h \
qpid/Options.h \
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index a69955c9dc..8176d92cac 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -34,13 +34,15 @@ std::string strError(int err) {
}
Exception::Exception(const std::string& msg) throw() : message(msg) {
- QPID_LOG(debug, "Exception thrown: " << message);
+ QPID_LOG(debug, "Exception: " << message);
}
Exception::~Exception() throw() {}
std::string Exception::getPrefix() const { return "Exception"; }
+std::string Exception::getMessage() const { return message; }
+
const char* Exception::what() const throw() {
if (whatStr.empty())
whatStr = getPrefix() + ": " + message;
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index e74fa79ed9..1be433f17a 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -43,10 +43,10 @@ class Exception : public std::exception
public:
explicit Exception(const std::string& message=std::string()) throw();
virtual ~Exception() throw();
- virtual const char* what() const throw();
+ virtual const char* what() const throw(); // prefix: message
+ virtual std::string getMessage() const; // Unprefixed message
+ virtual std::string getPrefix() const; // Prefix
- protected:
- std::string getPrefix() const;
private:
std::string message;
mutable std::string whatStr;
diff --git a/cpp/src/qpid/ExceptionHolder.h b/cpp/src/qpid/ExceptionHolder.h
new file mode 100644
index 0000000000..fed6308f19
--- /dev/null
+++ b/cpp/src/qpid/ExceptionHolder.h
@@ -0,0 +1,73 @@
+#ifndef QPID_EXCEPTIONHOLDER_H
+#define QPID_EXCEPTIONHOLDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/memory.h"
+#include <memory>
+
+namespace qpid {
+
+struct Raisable {
+ virtual ~Raisable() {};
+ virtual void raise() const=0;
+ virtual std::string what() const=0;
+};
+
+/**
+ * Holder for exceptions. Allows the thread that notices an error condition to
+ * create an exception and store it to be thrown by another thread.
+ */
+class ExceptionHolder : public Raisable {
+ public:
+ ExceptionHolder() {}
+ ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {}
+ /** Take ownership of ex */
+ template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); }
+ template <class Ex> ExceptionHolder(const std::auto_ptr<Ex>& ex) { wrap(ex.release()); }
+
+ ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; }
+ template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; }
+ template <class Ex> ExceptionHolder& operator=(std::auto_ptr<Ex> ex) { wrap(ex.release()); return *this; }
+
+ void raise() const { if (wrapper.get()) wrapper->raise() ; }
+ std::string what() const { return wrapper->what(); }
+ bool empty() const { return !wrapper.get(); }
+ operator bool() const { return !empty(); }
+ void reset() { wrapper.reset(); }
+
+ private:
+ template <class Ex> struct Wrapper : public Raisable {
+ Wrapper(Ex* ptr) : exception(ptr) {}
+ void raise() const { throw *exception; }
+ std::string what() const { return exception->what(); }
+ std::auto_ptr<Ex> exception;
+ };
+ template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); }
+ std::auto_ptr<Raisable> wrapper;
+
+};
+
+
+} // namespace qpid
+
+#endif /*!QPID_EXCEPTIONHOLDER_H*/
diff --git a/cpp/src/qpid/SessionId.cpp b/cpp/src/qpid/SessionId.cpp
new file mode 100644
index 0000000000..fce6619f5d
--- /dev/null
+++ b/cpp/src/qpid/SessionId.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SessionId.h"
+#include <sstream>
+
+namespace qpid {
+
+SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {}
+
+bool SessionId::operator<(const SessionId& id) const {
+ return userId < id.userId || (userId == id.userId && name < id.name);
+}
+
+bool SessionId::operator==(const SessionId& id) const {
+ return id.name == name && id.userId == userId;
+}
+
+std::ostream& operator<<(std::ostream& o, const SessionId& id) {
+ return o << id.getName() << "@" << id.getUserId();
+}
+
+std::string SessionId::str() const {
+ std::ostringstream o;
+ o << *this;
+ return o.str();
+}
+
+} // namespace qpid
diff --git a/cpp/src/qpid/SessionId.h b/cpp/src/qpid/SessionId.h
new file mode 100644
index 0000000000..08553e8b1d
--- /dev/null
+++ b/cpp/src/qpid/SessionId.h
@@ -0,0 +1,49 @@
+#ifndef QPID_SESSIONID_H
+#define QPID_SESSIONID_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/operators.hpp>
+#include <string>
+
+namespace qpid {
+
+/** Identifier for a session */
+class SessionId : boost::totally_ordered1<SessionId> {
+ std::string userId;
+ std::string name;
+ public:
+ SessionId(const std::string& userId=std::string(), const std::string& name=std::string());
+ std::string getUserId() const { return userId; }
+ std::string getName() const { return name; }
+ bool operator<(const SessionId&) const ;
+ bool operator==(const SessionId& id) const;
+ // Convert to a string
+ std::string str() const;
+};
+
+std::ostream& operator<<(std::ostream&, const SessionId&);
+
+
+} // namespace qpid
+
+#endif /*!QPID_SESSIONID_H*/
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 64fdd17b8f..8905fb5f9d 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -19,21 +19,24 @@
*
*/
-// FIXME aconway 2008-04-24: Reminders for handler implementation.
-//
-// - execution.sync results must be communicated to SessionState::peerConfirmed.
-//
-//
-
#include "SessionState.h"
#include "qpid/amqp_0_10/exceptions.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <numeric>
namespace qpid {
using framing::AMQFrame;
using amqp_0_10::NotImplementedException;
+using amqp_0_10::InvalidArgumentException;
+using amqp_0_10::IllegalStateException;
+
+namespace {
+bool isControl(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == 0;
+}
+} // namespace
/** A point in the session - command id + offset */
void SessionPoint::advance(const AMQFrame& f) {
@@ -60,103 +63,123 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
return command == x.command && offset == x.offset;
}
-SendState::SendState(size_t syncSize, size_t killSize)
- : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {}
+SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {}
+
+const SessionPoint& SessionState::SendState::getCommandPoint() {
+ return sendPoint;
+}
+
+bool SessionState::SendState::expected(const SessionPoint& point) {
+ if (point < replayPoint || sendPoint < point)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range."));
+ // FIXME aconway 2008-05-06: this is not strictly correct, we should keep
+ // an intermediate replay pointer into the replay list.
+ confirmed(point); // Drop commands prior to expected from replay.
+ return (!replayList.empty());
+}
-void SendState::send(const AMQFrame& f) {
- if (f.getMethod() && f.getMethod()->type() == 0)
- return; // Don't replay control frames.
+void SessionState::SendState::record(const AMQFrame& f) {
+ if (isControl(f)) return; // Ignore control frames.
+ session->stateful = true;
replayList.push_back(f);
unflushedSize += f.size();
+ incomplete += sendPoint.command;
sendPoint.advance(f);
}
-bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; }
+bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; }
-void SendState::sendFlush() {
+void SessionState::SendState::recordFlush() {
assert(flushPoint <= sendPoint);
flushPoint = sendPoint;
unflushedSize = 0;
}
-void SendState::peerConfirmed(const SessionPoint& confirmed) {
+void SessionState::SendState::confirmed(const SessionPoint& confirmed) {
+ if (confirmed > sendPoint)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent."));
ReplayList::iterator i = replayList.begin();
- // Ignore peerConfirmed.offset, we don't support partial replay.
while (i != replayList.end() && replayPoint.command < confirmed.command) {
- assert(replayPoint <= flushPoint);
replayPoint.advance(*i);
assert(replayPoint <= sendPoint);
- if (replayPoint > flushPoint) {
- flushPoint.advance(*i);
- assert(replayPoint <= flushPoint);
+ if (replayPoint > flushPoint)
unflushedSize -= i->size();
- }
++i;
}
+ if (replayPoint > flushPoint) flushPoint = replayPoint;
replayList.erase(replayList.begin(), i);
assert(replayPoint.offset == 0);
}
-void SendState::peerCompleted(const SequenceSet& commands) {
+void SessionState::SendState::completed(const SequenceSet& commands) {
if (commands.empty()) return;
- sentCompleted += commands;
+ incomplete -= commands;
// Completion implies confirmation but we don't handle out-of-order
// confirmation, so confirm only the first contiguous range of commands.
- peerConfirmed(SessionPoint(commands.rangesBegin()->end()));
+ confirmed(SessionPoint(commands.rangesBegin()->end()));
}
-bool ReceiveState::hasState() { return stateful; }
+SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {}
-void ReceiveState::setExpecting(const SessionPoint& point) {
- if (!hasState()) // initializing a new session.
- expecting = received = point;
- else { // setting point in an existing session.
- if (point > received)
- throw NotImplementedException("command-point out of bounds.");
- expecting = point;
- }
+void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) {
+ if (session->hasState() && point > received)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range."));
+ expected = point;
+ if (expected > received)
+ received = expected;
}
-ReceiveState::ReceiveState() : stateful() {}
-
-bool ReceiveState::receive(const AMQFrame& f) {
- stateful = true;
- expecting.advance(f);
- if (expecting > received) {
- received = expecting;
+bool SessionState::ReceiveState::record(const AMQFrame& f) {
+ if (isControl(f)) return true; // Ignore control frames.
+ session->stateful = true;
+ expected.advance(f);
+ if (expected > received) {
+ received = expected;
return true;
}
+ else {
+ QPID_LOG(debug, "Ignoring duplicate: " << f);
return false;
}
-
-void ReceiveState::localCompleted(SequenceNumber command) {
- assert(command < received.command); // Can't complete what we haven't received.
- receivedCompleted += command;
}
-void ReceiveState::peerKnownComplete(const SequenceSet& commands) {
- receivedCompleted -= commands;
+void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) {
+ assert(command <= received.command); // Internal error to complete an unreceived command.
+ assert(firstIncomplete <= command);
+ if (cumulative)
+ unknownCompleted.add(firstIncomplete, command);
+ else
+ unknownCompleted += command;
+ firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end();
}
-SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {}
-
-bool SessionId::operator<(const SessionId& id) const {
- return userId < id.userId || (userId == id.userId && name < id.name);
+void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) {
+ if (!commands.empty() && commands.back() > received.command)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands."));
+ unknownCompleted -= commands;
}
-bool SessionId::operator==(const SessionId& id) const {
- return id.name == name && id.userId == userId;
+SequenceNumber SessionState::ReceiveState::getCurrent() const {
+ SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic.
+ return --current;
}
+// FIXME aconway 2008-05-02: implement sync & kill limits.
SessionState::Configuration::Configuration()
: replaySyncSize(std::numeric_limits<size_t>::max()),
replayKillSize(std::numeric_limits<size_t>::max()) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : SendState(c.replaySyncSize, c.replayKillSize),
- id(i), timeout(), config(c) {}
+ : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful()
+{
+ QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
+}
+
+bool SessionState::hasState() const {
+ return stateful;
+}
-void SessionState::clear() { *this = SessionState(id, config); }
+SessionState::~SessionState() {}
std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
return o << "(" << p.command.getValue() << "+" << p.offset << ")";
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index b836534ee7..7957825dd3 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -22,9 +22,11 @@
*
*/
+#include <qpid/SessionId.h>
#include <qpid/framing/SequenceNumber.h>
#include <qpid/framing/SequenceSet.h>
#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/FrameHandler.h>
#include <boost/operators.hpp>
#include <vector>
#include <iosfwd>
@@ -49,118 +51,124 @@ struct SessionPoint : boost::totally_ordered1<SessionPoint> {
std::ostream& operator<<(std::ostream&, const SessionPoint&);
-/** The sending half of session state */
+/**
+ * Support for session idempotence barrier and resume as defined in
+ * AMQP 0-10.
+ *
+ * We only issue/use contiguous confirmations, out-of-order confirmation
+ * is ignored. Out of order completion is fully supported.
+ *
+ * Raises NotImplemented if the command point is set greater than the
+ * max currently received command data, either explicitly via
+ * session.command-point or implicitly via session.gap.
+ *
+ * Partial replay is not supported, replay always begins on a command
+ * boundary, and we never confirm partial commands.
+ *
+ * The SessionPoint data structure does store offsets so this class
+ * could be extended to support partial replay without
+ * source-incompatbile API changes.
+ */
+class SessionState {
+ public:
+
+ /** State for commands sent. Records commands for replay,
+ * tracks confirmation and completion of sent commands.
+ */
class SendState {
public:
typedef std::vector<framing::AMQFrame> ReplayList;
/** Record frame f for replay. Should not be called during replay. */
- void send(const framing::AMQFrame& f);
+ void record(const framing::AMQFrame& f);
/** @return true if we should send flush for confirmed and completed commands. */
bool needFlush() const;
/** Called when flush for confirmed and completed commands is sent to peer. */
- void sendFlush();
+ void recordFlush();
- /** Called when the peer confirms up to commands. */
- void peerConfirmed(const SessionPoint& confirmed);
+ /** Called when the peer confirms up to comfirmed. */
+ void confirmed(const SessionPoint& confirmed);
/** Called when the peer indicates commands completed */
- void peerCompleted(const SequenceSet& commands);
+ void completed(const SequenceSet& commands);
- /** Get the replay list. @see getReplayPoint. */
- const ReplayList& getReplayList() const { return replayList; }
-
- /**
- * The replay point is the point up to which all data has been
- * confirmed. Partial replay is not supported, it will always
- * have offset==0.
- */
+ /** Point from which we can replay. All data < replayPoint is confirmed. */
const SessionPoint& getReplayPoint() const { return replayPoint; }
- const SessionPoint& getSendPoint() const { return sendPoint; }
- const SequenceSet& getCompleted() const { return sentCompleted; }
+ /** Get the replay list, starting from getReplayPoint() */
+ // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&.
+ ReplayList& getReplayList() { return replayList; }
- protected:
- SendState(size_t replaySyncSize, size_t replayKillSize);
+ /** Point from which the next data will be sent. */
+ const SessionPoint& getCommandPoint();
+
+ /** Set of outstanding incomplete commands */
+ const SequenceSet& getIncomplete() const { return incomplete; }
+
+ /** Peer expecting commands from this point.
+ *@return true if replay is required, sets replayPoint.
+ */
+ bool expected(const SessionPoint& expected);
private:
- size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration.
+ SendState(SessionState& s);
+
+ SessionState* session;
// invariant: replayPoint <= flushPoint <= sendPoint
SessionPoint replayPoint; // Can replay from this point
- SessionPoint sendPoint; // Send from this point
SessionPoint flushPoint; // Point of last flush
+ SessionPoint sendPoint; // Send from this point
ReplayList replayList; // Starts from replayPoint.
size_t unflushedSize; // Un-flushed bytes in replay list.
- SequenceSet sentCompleted; // Commands sent and acknowledged as completed.
+ SequenceSet incomplete; // Commands sent and not yet completed.
+
+ friend class SessionState;
};
-/** Receiving half of SessionState */
+ /** State for commands received.
+ * Idempotence barrier for duplicate commands, tracks completion
+ * and of received commands.
+ */
class ReceiveState {
public:
- bool hasState();
-
/** Set the command point. */
- void setExpecting(const SessionPoint& point);
+ void setCommandPoint(const SessionPoint& point);
/** Returns true if frame should be be processed, false if it is a duplicate. */
- bool receive(const framing::AMQFrame& f);
+ bool record(const framing::AMQFrame& f);
/** Command completed locally */
- void localCompleted(SequenceNumber command);
+ void completed(SequenceNumber command, bool cumulative=false);
/** Peer has indicated commands are known completed */
- void peerKnownComplete(const SequenceSet& commands);
+ void knownCompleted(const SequenceSet& commands);
+
+ /** Get the incoming command point */
+ const SessionPoint& getExpected() const { return expected; }
- /** Recieved, completed and possibly not known by peer to be completed */
- const SequenceSet& getReceivedCompleted() const { return receivedCompleted; }
- const SessionPoint& getExpecting() const { return expecting; }
+ /** Get the received high-water-mark, may be > getExpected() during replay */
const SessionPoint& getReceived() const { return received; }
- protected:
- ReceiveState();
+ /** Completed commands that the peer may not know about */
+ const SequenceSet& getUnknownComplete() const { return unknownCompleted; }
+
+ /** ID of the command currently being handled. */
+ SequenceNumber getCurrent() const;
private:
- bool stateful; // True if session has state.
- SessionPoint expecting; // Expecting from here
- SessionPoint received; // Received to here. Invariant: expecting <= received.
- SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer
-};
+ ReceiveState(SessionState&);
-/** Identifier for a session */
-class SessionId : boost::totally_ordered1<SessionId> {
- std::string userId;
- std::string name;
- public:
- SessionId(const std::string& userId=std::string(), const std::string& name=std::string());
- std::string getUserId() const { return userId; }
- std::string getName() const { return name; }
- bool operator<(const SessionId&) const ;
- bool operator==(const SessionId& id) const;
-};
+ SessionState* session;
+ SessionPoint expected; // Expected from here
+ SessionPoint received; // Received to here. Invariant: expected <= received.
+ SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
+ SequenceNumber firstIncomplete; // First incomplete command.
+ friend class SessionState;
+ };
-/**
- * Support for session idempotence barrier and resume as defined in
- * AMQP 0-10.
- *
- * We only issue/use contiguous confirmations, out-of-order confirmation
- * is ignored. Out of order completion is fully supported.
- *
- * Raises NotImplemented if the command point is set greater than the
- * max currently received command data, either explicitly via
- * session.command-point or implicitly via session.gap.
- *
- * Partial replay is not supported, replay always begins on a command
- * boundary, and we never confirm partial commands.
- *
- * The SessionPoint data structure does store offsets so this class
- * could be extended to support partial replay without
- * source-incompatbile API changes.
- */
-class SessionState : public SendState, public ReceiveState {
- public:
struct Configuration {
Configuration();
size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes
@@ -169,19 +177,32 @@ class SessionState : public SendState, public ReceiveState {
SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
+ virtual ~SessionState();
+
const SessionId& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
void setTimeout(uint32_t seconds) { timeout = seconds; }
- /** Clear all state except Id. */
- void clear();
+ bool operator==(const SessionId& other) const { return id == other; }
+ bool operator==(const SessionState& other) const { return id == other.id; }
+
+ SendState sender; ///< State for commands sent
+ ReceiveState receiver; ///< State for commands received
+
+ bool hasState() const;
private:
SessionId id;
uint32_t timeout;
Configuration config;
+ bool stateful;
+
+ friend class SendState;
+ friend class ReceiveState;
};
+inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
+
} // namespace qpid
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
new file mode 100644
index 0000000000..3fb2579e8c
--- /dev/null
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "SessionHandler.h"
+#include "qpid/SessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/log/Statement.h"
+
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace amqp_0_10 {
+using namespace framing;
+using namespace std;
+
+SessionHandler::SessionHandler() : peer(channel), ignoring(), sendReady(), receiveReady() {}
+
+SessionHandler::SessionHandler(FrameHandler& out, ChannelId ch)
+ : channel(ch, &out), peer(channel), ignoring(false) {}
+
+SessionHandler::~SessionHandler() {}
+
+namespace {
+bool isSessionControl(AMQMethodBody* m) {
+ return m &&
+ m->amqpClassId() == SESSION_CLASS_ID;
+}
+bool isSessionDetachedControl(AMQMethodBody* m) {
+ return isSessionControl(m) &&
+ m->amqpMethodId() == SESSION_DETACHED_METHOD_ID;
+}
+} // namespace
+
+void SessionHandler::checkAttached() {
+ if (!getState())
+ throw NotAttachedException(
+ QPID_MSG("Channel " << channel.get() << " is not attached"));
+ assert(getInHandler());
+ assert(channel.next);
+}
+
+void SessionHandler::invoke(const AMQMethodBody& m) {
+ framing::invoke(*this, m);
+}
+
+void SessionHandler::handleIn(AMQFrame& f) {
+ // Note on channel states: a channel is attached if session != 0
+ AMQMethodBody* m = f.getBody()->getMethod();
+ try {
+ if (ignoring && !isSessionDetachedControl(m))
+ return;
+ else if (isSessionControl(m))
+ invoke(*m);
+ else {
+ checkAttached();
+ if (!receiveReady)
+ throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
+ if (!getState()->receiver.record(f))
+ return; // Ignore duplicates.
+ getInHandler()->handle(f);
+ }
+ }
+ catch(const ChannelException& e){
+ QPID_LOG(error, "Channel exception: " << e.what());
+ if (getState())
+ peer.detached(getState()->getId().getName(), e.code);
+ channelException(e.code, e.getMessage());
+ }
+ catch(const ConnectionException& e) {
+ QPID_LOG(error, "Connection exception: " << e.what());
+ connectionException(e.code, e.getMessage());
+ }
+ catch(const std::exception& e) {
+ QPID_LOG(error, "Unexpected exception: " << e.what());
+ connectionException(connection::FRAMING_ERROR, e.what());
+ }
+}
+
+void SessionHandler::handleOut(AMQFrame& f) {
+ checkAttached();
+ if (!sendReady)
+ throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
+ getState()->sender.record(f);
+ if (getState()->sender.needFlush()) {
+ peer.flush(false, true, true);
+ getState()->sender.recordFlush();
+ }
+ channel.handle(f);
+}
+
+void SessionHandler::checkName(const std::string& name) {
+ checkAttached();
+ if (name != getState()->getId().getName())
+ throw InvalidArgumentException(
+ QPID_MSG("Incorrect session name: " << name
+ << ", expecting: " << getState()->getId().getName()));
+}
+
+void SessionHandler::attach(const std::string& name, bool force) {
+ if (getState() && name == getState()->getId().getName())
+ return; // Idempotent
+ if (getState())
+ throw SessionBusyException(
+ QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId()));
+ setState(name, force);
+ QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId());
+ peer.attached(name);
+ if (getState()->hasState())
+ peer.flush(true, true, true);
+ else
+ sendCommandPoint();
+}
+
+void SessionHandler::attached(const std::string& name) {
+ checkName(name);
+}
+
+void SessionHandler::detach(const std::string& name) {
+ checkName(name);
+ peer.detached(name, session::NORMAL);
+ handleDetach();
+}
+
+void SessionHandler::detached(const std::string& name, uint8_t code) {
+ checkName(name);
+ ignoring = false;
+ if (code != session::NORMAL)
+ channelException(code, "session.detached from peer.");
+ else {
+ handleDetach();
+ }
+}
+
+void SessionHandler::handleDetach() {
+ sendReady = receiveReady = false;
+}
+
+void SessionHandler::requestTimeout(uint32_t t) {
+ checkAttached();
+ getState()->setTimeout(t);
+ peer.timeout(t);
+}
+
+void SessionHandler::timeout(uint32_t t) {
+ checkAttached();
+ getState()->setTimeout(t);
+}
+
+void SessionHandler::commandPoint(const SequenceNumber& id, uint64_t offset) {
+ checkAttached();
+ getState()->receiver.setCommandPoint(SessionPoint(id, offset));
+ if (!receiveReady) {
+ receiveReady = true;
+ readyToReceive();
+ }
+}
+
+void SessionHandler::expected(const SequenceSet& commands, const Array& /*fragments*/) {
+ checkAttached();
+ if (commands.empty() && getState()->hasState())
+ throw IllegalStateException(
+ QPID_MSG(getState()->getId() << ": has state but client is attaching as new session."));
+ getState()->sender.expected(commands.empty() ? SequenceNumber(0) : commands.front());
+ if (!sendReady) // send command point if not already sent
+ sendCommandPoint();
+}
+
+void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragments*/) {
+ checkAttached();
+ // Ignore non-contiguous confirmations.
+ if (!commands.empty() && commands.front() >= getState()->sender.getReplayPoint()) {
+ getState()->sender.confirmed(commands.rangesBegin()->last());
+ }
+}
+
+void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) {
+ checkAttached();
+ getState()->sender.completed(commands);
+ if (!commands.empty())
+ peer.knownCompleted(commands); // Always send a timely reply
+}
+
+void SessionHandler::knownCompleted(const SequenceSet& commands) {
+ checkAttached();
+ getState()->receiver.knownCompleted(commands);
+}
+
+void SessionHandler::flush(bool expected, bool confirmed, bool completed) {
+ checkAttached();
+ if (expected) {
+ SequenceSet expectSet;
+ if (getState()->hasState())
+ expectSet.add(getState()->receiver.getExpected().command);
+ peer.expected(expectSet, Array());
+ }
+ if (confirmed) {
+ SequenceSet confirmSet;
+ if (!getState()->receiver.getUnknownComplete().empty())
+ confirmSet.add(getState()->receiver.getUnknownComplete().front(),
+ getState()->receiver.getReceived().command);
+ peer.confirmed(confirmSet, Array());
+ }
+ if (completed)
+ peer.completed(getState()->receiver.getUnknownComplete(), true);
+}
+
+void SessionHandler::gap(const SequenceSet& /*commands*/) {
+ throw NotImplementedException("session.gap not supported");
+}
+
+void SessionHandler::sendDetach()
+{
+ checkAttached();
+ ignoring = true;
+ peer.detach(getState()->getId().getName());
+}
+
+void SessionHandler::sendCompletion() {
+ checkAttached();
+ peer.completed(getState()->receiver.getUnknownComplete(), true);
+}
+
+void SessionHandler::sendAttach(bool force) {
+ checkAttached();
+ peer.attach(getState()->getId().getName(), force);
+ if (getState()->hasState())
+ peer.flush(true, true, true);
+ else
+ sendCommandPoint();
+}
+
+void SessionHandler::sendCommandPoint() {
+ SessionPoint point(getState()->sender.getCommandPoint());
+ peer.commandPoint(point.command, point.offset);
+ if (!sendReady) {
+ sendReady = true;
+ readyToSend();
+ }
+}
+
+void SessionHandler::sendTimeout(uint32_t t) {
+ checkAttached();
+ peer.requestTimeout(t);
+}
+
+void SessionHandler::sendFlush() {
+ peer.flush(false, true, true);
+}
+
+bool SessionHandler::ready() const {
+ return sendReady && receiveReady;
+}
+
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h
new file mode 100644
index 0000000000..85577ebafc
--- /dev/null
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -0,0 +1,116 @@
+#ifndef QPID_AMQP_0_10_SESSIONHANDLER_H
+#define QPID_AMQP_0_10_SESSIONHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/AMQP_AllProxy.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+
+namespace qpid {
+
+class SessionState;
+
+namespace amqp_0_10 {
+
+/**
+ * Base SessionHandler with logic common to both client and broker.
+ *
+ * A SessionHandler is associated with a channel and can be attached
+ * to a session state.
+ */
+
+class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
+ public framing::FrameHandler::InOutHandler
+{
+ public:
+ typedef framing::AMQP_AllProxy::Session Peer;
+
+ SessionHandler();
+ SessionHandler(framing::FrameHandler& out, uint16_t channel);
+ ~SessionHandler();
+
+ void setChannel(uint16_t ch) { channel = ch; }
+ uint16_t getChannel() const { return channel.get(); }
+
+ void setOutHandler(framing::FrameHandler& h) { channel.next = &h; }
+
+ virtual SessionState* getState() = 0;
+ virtual framing::FrameHandler* getInHandler() = 0;
+
+ // Non-protocol methods, called locally to initiate some action.
+ void sendDetach();
+ void sendCompletion();
+ void sendAttach(bool force);
+ void sendTimeout(uint32_t t);
+ void sendFlush();
+ void sendCommandPoint();
+
+ /** True if the handler is ready to send and receive */
+ bool ready() const;
+
+ // Protocol methods
+ void attach(const std::string& name, bool force);
+ void attached(const std::string& name);
+ void detach(const std::string& name);
+ void detached(const std::string& name, uint8_t code);
+
+ void requestTimeout(uint32_t t);
+ void timeout(uint32_t t);
+
+ void commandPoint(const framing::SequenceNumber& id, uint64_t offset);
+ void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments);
+ void completed(const framing::SequenceSet& commands, bool timelyReply);
+ void knownCompleted(const framing::SequenceSet& commands);
+ void flush(bool expected, bool confirmed, bool completed);
+ void gap(const framing::SequenceSet& commands);
+
+ protected:
+ virtual void invoke(const framing::AMQMethodBody& m);
+
+ virtual void setState(const std::string& sessionName, bool force) = 0;
+ virtual void channelException(uint16_t code, const std::string& msg) = 0;
+ virtual void connectionException(uint16_t code, const std::string& msg) = 0;
+
+
+ // Notification of events
+ virtual void readyToSend() {}
+ virtual void readyToReceive() {}
+ virtual void handleDetach();
+
+ virtual void handleIn(framing::AMQFrame&);
+ virtual void handleOut(framing::AMQFrame&);
+
+ void checkAttached();
+ void checkName(const std::string& name);
+
+ framing::ChannelHandler channel;
+ Peer peer;
+ bool ignoring;
+ bool sendReady, receiveReady;
+ // FIXME aconway 2008-05-07: move handler-related functions from SessionState.
+
+};
+}} // namespace qpid::amqp_0_10
+
+#endif /*!QPID_AMQP_0_10_SESSIONHANDLER_H*/
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index fbf3c0b7ca..b93869be85 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -82,14 +82,14 @@ struct Handler {
template <class X, void (X::*F)(T)>
class MemFunRef : public Handler<T> {
public:
- MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(x) {}
- void handle(T t) { (target.*F)(t); }
+ MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
+ void handle(T t) { (target->*F)(t); }
/** Allow calling with -> syntax, compatible with Chains */
MemFunRef* operator->() { return this; }
private:
- X& target;
+ X* target;
};
/** Interface for a handler that implements a
diff --git a/cpp/src/qpid/framing/Proxy.cpp b/cpp/src/qpid/framing/Proxy.cpp
index b47060028f..6b37fb368d 100644
--- a/cpp/src/qpid/framing/Proxy.cpp
+++ b/cpp/src/qpid/framing/Proxy.cpp
@@ -22,16 +22,21 @@
namespace qpid {
namespace framing {
+Proxy::Proxy(FrameHandler& h) : out(&h) {}
+
Proxy::~Proxy() {}
void Proxy::send(const AMQBody& b) {
AMQFrame f(b);
- out.handle(f);
+ out->handle(f);
}
-
ProtocolVersion Proxy::getVersion() const {
return ProtocolVersion();
}
+FrameHandler& Proxy::getHandler() { return *out; }
+
+void Proxy::setHandler(FrameHandler& f) { out=&f; }
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Proxy.h b/cpp/src/qpid/framing/Proxy.h
index 86b99a83b0..3dc082097a 100644
--- a/cpp/src/qpid/framing/Proxy.h
+++ b/cpp/src/qpid/framing/Proxy.h
@@ -33,16 +33,18 @@ class AMQBody;
class Proxy
{
public:
- Proxy(FrameHandler& h) : out(h) {}
+ Proxy(FrameHandler& h);
virtual ~Proxy();
void send(const AMQBody&);
ProtocolVersion getVersion() const;
- FrameHandler& getHandler() { return out; }
- protected:
- FrameHandler& out;
+ FrameHandler& getHandler();
+ void setHandler(FrameHandler&);
+
+ private:
+ FrameHandler* out;
};
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp
index cdf890b7f8..9ba55b2fa8 100644
--- a/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/cpp/src/qpid/framing/SequenceSet.cpp
@@ -84,7 +84,7 @@ void SequenceSet::remove(const SequenceNumber& s) { *this -= s; }
struct RangePrinter {
std::ostream& out;
RangePrinter(std::ostream& o) : out(o) {}
- void operator()(SequenceNumber i, SequenceNumber j) {
+ void operator()(SequenceNumber i, SequenceNumber j) const {
out << "[" << i.getValue() << "," << j.getValue() << "] ";
}
};
diff --git a/cpp/src/qpid/framing/SequenceSet.h b/cpp/src/qpid/framing/SequenceSet.h
index 029a26818e..99e7cb4b21 100644
--- a/cpp/src/qpid/framing/SequenceSet.h
+++ b/cpp/src/qpid/framing/SequenceSet.h
@@ -34,6 +34,8 @@ class SequenceSet : public RangeSet<SequenceNumber> {
explicit SequenceSet(const RangeSet<SequenceNumber>& r)
: RangeSet<SequenceNumber>(r) {}
explicit SequenceSet(const SequenceNumber& s) { add(s); }
+ SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish); }
+
void encode(Buffer& buffer) const;
void decode(Buffer& buffer);
@@ -41,17 +43,20 @@ class SequenceSet : public RangeSet<SequenceNumber> {
bool contains(const SequenceNumber& s) const;
void add(const SequenceNumber& s);
- void add(const SequenceNumber& start, const SequenceNumber& end);
+ void add(const SequenceNumber& start, const SequenceNumber& finish); // Closed range
void add(const SequenceSet& set);
void remove(const SequenceNumber& s);
- void remove(const SequenceNumber& start, const SequenceNumber& end);
+ void remove(const SequenceNumber& start, const SequenceNumber& finish); // Closed range
void remove(const SequenceSet& set);
- template <class T> T for_each(T& t) const {
- for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++) {
+ template <class T> void for_each(T& t) const {
+ for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++)
t(i->first(), i->last());
}
- return t;
+
+ template <class T> void for_each(const T& t) const {
+ for (RangeIterator i = rangesBegin(); i != rangesEnd(); i++)
+ t(i->first(), i->last());
}
friend std::ostream& operator<<(std::ostream&, const SequenceSet&);
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
index 4b3f704dda..1df62b3138 100644
--- a/cpp/src/qpid/framing/SessionState.h
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -70,7 +70,7 @@ class SessionState
SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
- State getState() const { return state; }
+ State getState() { return state; }
/** Received incoming L3 frame.
* @return SequenceNumber if an ack should be sent, empty otherwise.
diff --git a/cpp/src/tests/.valgrindrc b/cpp/src/tests/.valgrindrc
index 4aba7661de..76bac021d8 100644
--- a/cpp/src/tests/.valgrindrc
+++ b/cpp/src/tests/.valgrindrc
@@ -4,4 +4,5 @@
--suppressions=.valgrind.supp
--num-callers=25
--trace-children=yes
+--error-exitcode=1
diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp
index 752d6d3e75..71b90ea9f1 100644
--- a/cpp/src/tests/SessionState.cpp
+++ b/cpp/src/tests/SessionState.cpp
@@ -18,7 +18,6 @@
#include "unit_test.h"
-#include "qpid/framing/SessionState.h" // FIXME aconway 2008-04-23: preview code to remove.
#include "qpid/SessionState.h"
#include "qpid/Exception.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -85,7 +84,7 @@ AMQFrame contentFrameChar(char content, bool isLast=true) {
}
// Send frame & return size of frame.
-size_t send(qpid::SessionState& s, const AMQFrame& f) { s.send(f); return f.size(); }
+size_t send(qpid::SessionState& s, const AMQFrame& f) { s.sender.record(f); return f.size(); }
// Send transfer command with no content.
size_t transfer0(qpid::SessionState& s) { return send(s, transferFrame(false)); }
// Send transfer frame with single content frame.
@@ -127,13 +126,14 @@ using qpid::SessionPoint;
QPID_AUTO_TEST_CASE(testSendGetReplyList) {
qpid::SessionState s;
+ s.sender.getCommandPoint();
transfer1(s, "abc");
transfers(s, "def");
transferN(s, "xyz");
- BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz");
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz");
// Ignore controls.
- s.send(AMQFrame(in_place<SessionFlushBody>()));
- BOOST_CHECK_EQUAL(str(s.getReplayList()),"CabcCdCeCfCxyz");
+ s.sender.record(AMQFrame(in_place<SessionFlushBody>()));
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()),"CabcCdCeCfCxyz");
}
QPID_AUTO_TEST_CASE(testNeedFlush) {
@@ -141,17 +141,18 @@ QPID_AUTO_TEST_CASE(testNeedFlush) {
// sync after 2 1-byte transfers or equivalent bytes.
c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize());
qpid::SessionState s(SessionId(), c);
+ s.sender.getCommandPoint();
transfers(s, "a");
- BOOST_CHECK(!s.needFlush());
+ BOOST_CHECK(!s.sender.needFlush());
transfers(s, "b");
- BOOST_CHECK(s.needFlush());
- s.sendFlush();
- BOOST_CHECK(!s.needFlush());
+ BOOST_CHECK(s.sender.needFlush());
+ s.sender.recordFlush();
+ BOOST_CHECK(!s.sender.needFlush());
transfers(s, "c");
- BOOST_CHECK(!s.needFlush());
+ BOOST_CHECK(!s.sender.needFlush());
transfers(s, "d");
- BOOST_CHECK(s.needFlush());
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd");
+ BOOST_CHECK(s.sender.needFlush());
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd");
}
QPID_AUTO_TEST_CASE(testPeerConfirmed) {
@@ -159,192 +160,103 @@ QPID_AUTO_TEST_CASE(testPeerConfirmed) {
// sync after 2 1-byte transfers or equivalent bytes.
c.replaySyncSize = 2*(transferFrameSize()+contentFrameSize());
qpid::SessionState s(SessionId(), c);
+ s.sender.getCommandPoint();
transfers(s, "ab");
- BOOST_CHECK(s.needFlush());
+ BOOST_CHECK(s.sender.needFlush());
transfers(s, "cd");
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCcCd");
- s.peerConfirmed(SessionPoint(3));
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cd");
- BOOST_CHECK(!s.needFlush());
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCcCd");
+ s.sender.confirmed(SessionPoint(3));
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cd");
+ BOOST_CHECK(!s.sender.needFlush());
// Never go backwards.
- s.peerConfirmed(SessionPoint(2));
- s.peerConfirmed(SessionPoint(3));
+ s.sender.confirmed(SessionPoint(2));
+ s.sender.confirmed(SessionPoint(3));
// Multi-frame transfer.
transfer1(s, "efg");
transfers(s, "xy");
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CdCefgCxCy");
- BOOST_CHECK(s.needFlush());
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CdCefgCxCy");
+ BOOST_CHECK(s.sender.needFlush());
- s.peerConfirmed(SessionPoint(4));
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CefgCxCy");
- BOOST_CHECK(s.needFlush());
+ s.sender.confirmed(SessionPoint(4));
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CefgCxCy");
+ BOOST_CHECK(s.sender.needFlush());
- s.peerConfirmed(SessionPoint(5));
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CxCy");
- BOOST_CHECK(s.needFlush());
+ s.sender.confirmed(SessionPoint(5));
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CxCy");
+ BOOST_CHECK(s.sender.needFlush());
- s.peerConfirmed(SessionPoint(6));
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cy");
- BOOST_CHECK(!s.needFlush());
+ s.sender.confirmed(SessionPoint(6));
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cy");
+ BOOST_CHECK(!s.sender.needFlush());
}
QPID_AUTO_TEST_CASE(testPeerCompleted) {
qpid::SessionState s;
+ s.sender.getCommandPoint();
// Completion implies confirmation
transfers(s, "abc");
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CaCbCc");
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CaCbCc");
SequenceSet set(SequenceSet() + 0 + 1);
- s.peerCompleted(set);
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "Cc");
+ s.sender.completed(set);
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "Cc");
transfers(s, "def");
// We dont do out-of-order confirmation, so this will only confirm up to 3:
set = SequenceSet(SequenceSet() + 2 + 3 + 5);
- s.peerCompleted(set);
- BOOST_CHECK_EQUAL(str(s.getReplayList()), "CeCf");
+ s.sender.completed(set);
+ BOOST_CHECK_EQUAL(str(s.sender.getReplayList()), "CeCf");
}
QPID_AUTO_TEST_CASE(testReceive) {
- // Advance expecting/received correctly
+ // Advance expected/received correctly
qpid::SessionState s;
- BOOST_CHECK(!s.hasState());
- BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(0));
- BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(0));
+ s.receiver.setCommandPoint(SessionPoint());
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(0));
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(0));
- BOOST_CHECK(s.receive(transferFrame(false)));
- BOOST_CHECK(s.hasState());
- BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(1));
- BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(1));
+ BOOST_CHECK(s.receiver.record(transferFrame(false)));
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(1));
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(1));
- BOOST_CHECK(s.receive(transferFrame(true)));
+ BOOST_CHECK(s.receiver.record(transferFrame(true)));
SessionPoint point = SessionPoint(1, transferFrameSize());
- BOOST_CHECK_EQUAL(s.getExpecting(), point);
- BOOST_CHECK_EQUAL(s.getReceived(), point);
- BOOST_CHECK(s.receive(contentFrame("", false)));
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), point);
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), point);
+ BOOST_CHECK(s.receiver.record(contentFrame("", false)));
point.offset += contentFrameSize(0);
- BOOST_CHECK_EQUAL(s.getExpecting(), point);
- BOOST_CHECK_EQUAL(s.getReceived(), point);
- BOOST_CHECK(s.receive(contentFrame("", true)));
- BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2));
- BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2));
-
- // Idempotence barrier, rewind expecting & receive some duplicates.
- s.setExpecting(SessionPoint(1));
- BOOST_CHECK(!s.receive(transferFrame(false)));
- BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(2));
- BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(2));
- BOOST_CHECK(s.receive(transferFrame(false)));
- BOOST_CHECK_EQUAL(s.getExpecting(), SessionPoint(3));
- BOOST_CHECK_EQUAL(s.getReceived(), SessionPoint(3));
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), point);
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), point);
+ BOOST_CHECK(s.receiver.record(contentFrame("", true)));
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2));
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2));
+
+ // Idempotence barrier, rewind expected & receive some duplicates.
+ s.receiver.setCommandPoint(SessionPoint(1));
+ BOOST_CHECK(!s.receiver.record(transferFrame(false)));
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(2));
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(2));
+ BOOST_CHECK(s.receiver.record(transferFrame(false)));
+ BOOST_CHECK_EQUAL(s.receiver.getExpected(), SessionPoint(3));
+ BOOST_CHECK_EQUAL(s.receiver.getReceived(), SessionPoint(3));
}
QPID_AUTO_TEST_CASE(testCompleted) {
// completed & unknownCompleted
qpid::SessionState s;
- s.receive(transferFrame(false));
- s.receive(transferFrame(false));
- s.receive(transferFrame(false));
- s.localCompleted(1);
- BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+1));
- s.localCompleted(0);
- BOOST_CHECK_EQUAL(s.getReceivedCompleted(),
+ s.receiver.setCommandPoint(SessionPoint());
+ s.receiver.record(transferFrame(false));
+ s.receiver.record(transferFrame(false));
+ s.receiver.record(transferFrame(false));
+ s.receiver.completed(1);
+ BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+1));
+ s.receiver.completed(0);
+ BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(),
SequenceSet(SequenceSet() + SequenceSet::Range(0,2)));
- s.peerKnownComplete(SequenceSet(SequenceSet()+1));
- BOOST_CHECK_EQUAL(s.getReceivedCompleted(), SequenceSet(SequenceSet()+2));
-}
-
-// ================================================================
-// FIXME aconway 2008-04-23: Below here is old preview framing::SessionState test, remove with preview code.
-
-using namespace qpid::framing;
-
-// Sent chars as frames
-void sent(SessionState& session, const std::string& frames) {
- for_each(frames.begin(), frames.end(),
- bind(&SessionState::sent, ref(session), bind(frame, _1)));
-}
-
-// Received chars as frames
-void received(SessionState& session, const std::string& frames) {
- for_each(frames.begin(), frames.end(),
- bind(&SessionState::received, ref(session), bind(frame, _1)));
-}
-
-bool operator==(const AMQFrame& a, const AMQFrame& b) {
- const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody());
- const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody());
- return ab && bb && ab->getData() == bb->getData();
-}
-
-QPID_AUTO_TEST_CASE(testSent) {
- // Test that we send solicit-ack at the right interval.
- AMQContentBody f;
- SessionState s1(1);
- BOOST_CHECK(s1.sent(f));
- BOOST_CHECK(s1.sent(f));
- BOOST_CHECK(s1.sent(f));
-
- SessionState s3(3);
- BOOST_CHECK(!s3.sent(f));
- BOOST_CHECK(!s3.sent(f));
- BOOST_CHECK(s3.sent(f));
-
- BOOST_CHECK(!s3.sent(f));
- BOOST_CHECK(!s3.sent(f));
- s3.receivedAck(4);
- BOOST_CHECK(!s3.sent(f));
- BOOST_CHECK(!s3.sent(f));
- BOOST_CHECK(s3.sent(f));
-}
-
-QPID_AUTO_TEST_CASE(testReplay) {
- // Replay of all frames.
- SessionState session(100);
- sent(session, "abc");
- session.suspend(); session.resuming();
- session.receivedAck(-1);
- BOOST_CHECK_EQUAL(str(session.replay()), "abc");
-
- // Replay with acks
- session.receivedAck(0); // ack a.
- session.suspend();
- session.resuming();
- session.receivedAck(1); // ack b.
- BOOST_CHECK_EQUAL(str(session.replay()), "c");
-
- // Replay after further frames.
- sent(session, "def");
- session.suspend();
- session.resuming();
- session.receivedAck(3);
- BOOST_CHECK_EQUAL(str(session.replay()), "ef");
-
- // Bad ack, too high
- try {
- session.receivedAck(6);
- BOOST_FAIL("expected exception");
- } catch(const std::exception&) {}
-
-}
-
-QPID_AUTO_TEST_CASE(testReceived) {
- // Check that we request acks at the right interval.
- AMQContentBody f;
- SessionState s1(1);
- BOOST_CHECK_EQUAL(0u, *s1.received(f));
- BOOST_CHECK_EQUAL(1u, *s1.received(f));
- BOOST_CHECK_EQUAL(2u, *s1.received(f));
-
- SessionState s3(3);
- BOOST_CHECK(!s3.received(f));
- BOOST_CHECK(!s3.received(f));
- BOOST_CHECK_EQUAL(2u, *s3.received(f));
-
- BOOST_CHECK(!s3.received(f));
- BOOST_CHECK(!s3.received(f));
- BOOST_CHECK_EQUAL(5u, *s3.received(f));
+ s.receiver.knownCompleted(SequenceSet(SequenceSet()+1));
+ BOOST_CHECK_EQUAL(s.receiver.getUnknownComplete(), SequenceSet(SequenceSet()+2));
+ // TODO aconway 2008-04-30: missing tests for known-completed.
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h
index 3263652fe2..b53387bd57 100644
--- a/cpp/src/tests/SocketProxy.h
+++ b/cpp/src/tests/SocketProxy.h
@@ -42,7 +42,7 @@ class SocketProxy : private qpid::sys::Runnable
* Listen for connection on getPort().
*/
SocketProxy(int connectPort, const std::string host="localhost")
- : closed(false), port(listener.listen())
+ : closed(false), port(listener.listen()), dropClient(), dropServer()
{
client.connect(host, connectPort);
thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
@@ -58,10 +58,17 @@ class SocketProxy : private qpid::sys::Runnable
closed=true;
}
poller.shutdown();
+ if (thread.id() != qpid::sys::Thread::current().id())
thread.join();
client.close();
}
+ /** Simulate lost packets, drop data from client */
+ void dropClientData(bool drop=true) { dropClient=drop; }
+
+ /** Simulate lost packets, drop data from server */
+ void dropServerData(bool drop=true) { dropServer=drop; }
+
bool isClosed() const {
qpid::sys::Mutex::ScopedLock l(lock);
return closed;
@@ -83,8 +90,8 @@ class SocketProxy : private qpid::sys::Runnable
qpid::sys::PollerHandle listenerHandle(listener);
poller.addFd(listenerHandle, qpid::sys::Poller::IN);
qpid::sys::Poller::Event event = poller.wait();
- throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()");
- throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed");
+ throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
+ throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed");
poller.delFd(listenerHandle);
server.reset(listener.accept(0, 0));
@@ -97,25 +104,32 @@ class SocketProxy : private qpid::sys::Runnable
char buffer[1024];
for (;;) {
qpid::sys::Poller::Event event = poller.wait();
- throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()");
- throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected");
+ throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
+ throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected");
if (event.handle == &serverHandle) {
- client.write(buffer, server->read(buffer, sizeof(buffer)));
+ ssize_t n = server->read(buffer, sizeof(buffer));
+ if (!dropServer) client.write(buffer, n);
poller.rearmFd(serverHandle);
} else if (event.handle == &clientHandle) {
- server->write(buffer, client.read(buffer, sizeof(buffer)));
+ ssize_t n = client.read(buffer, sizeof(buffer));
+ if (!dropClient) server->write(buffer, n);
poller.rearmFd(clientHandle);
} else {
- throwIf(true, "No handle ready");
+ throwIf(true, "SocketProxy: No handle ready");
}
}
}
catch (const std::exception& e) {
- QPID_LOG(debug, "SocketProxy::run exiting: " << e.what());
+ QPID_LOG(debug, "SocketProxy::run exception: " << e.what());
}
+ try {
if (server.get()) server->close();
close();
}
+ catch (const std::exception& e) {
+ QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what());
+ }
+ }
mutable qpid::sys::Mutex lock;
bool closed;
@@ -123,6 +137,7 @@ class SocketProxy : private qpid::sys::Runnable
qpid::sys::Socket client, listener;
uint16_t port;
qpid::sys::Thread thread;
+ bool dropClient, dropServer;
};
#endif