summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/templates/constants.rb68
-rw-r--r--cpp/src/Makefile.am13
-rw-r--r--cpp/src/qpid/Exception.cpp42
-rw-r--r--cpp/src/qpid/Exception.h125
-rw-r--r--cpp/src/qpid/Msg.h2
-rw-r--r--cpp/src/qpid/QpidError.cpp41
-rw-r--r--cpp/src/qpid/QpidError.h79
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp21
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp9
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/Daemon.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp6
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp12
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.h7
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp12
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp7
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp26
-rw-r--r--cpp/src/qpid/broker/Queue.cpp14
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp27
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp92
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h14
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionManager.h12
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp37
-rw-r--r--cpp/src/qpid/broker/SessionState.h41
-rw-r--r--cpp/src/qpid/broker/Timer.cpp7
-rw-r--r--cpp/src/qpid/broker/Timer.h2
-rw-r--r--cpp/src/qpid/client/Channel.cpp5
-rw-r--r--cpp/src/qpid/client/Connection.cpp26
-rw-r--r--cpp/src/qpid/client/Connection.h7
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp58
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h14
-rw-r--r--cpp/src/qpid/client/Connector.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.h1
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/Future.h2
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp2
-rw-r--r--cpp/src/qpid/client/FutureResult.cpp2
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp366
-rw-r--r--cpp/src/qpid/client/SessionCore.h124
-rw-r--r--cpp/src/qpid/client/SessionHandler.cpp132
-rw-r--r--cpp/src/qpid/client/SessionHandler.h63
-rw-r--r--cpp/src/qpid/client/StateManager.cpp2
-rw-r--r--cpp/src/qpid/client/StateManager.h4
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp14
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp6
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp16
-rw-r--r--cpp/src/qpid/framing/ChannelHandler.h (renamed from cpp/src/qpid/framing/ProtocolVersionException.h)47
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp5
-rw-r--r--cpp/src/qpid/framing/FieldValue.cpp7
-rw-r--r--cpp/src/qpid/framing/FramingContent.cpp14
-rw-r--r--cpp/src/qpid/framing/Handler.h3
-rw-r--r--cpp/src/qpid/framing/ProtocolVersionException.cpp33
-rw-r--r--cpp/src/qpid/framing/ResumeHandler.cpp56
-rw-r--r--cpp/src/qpid/framing/ResumeHandler.h69
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h1
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp120
-rw-r--r--cpp/src/qpid/framing/SessionState.h127
-rw-r--r--cpp/src/qpid/framing/TemplateVisitor.h89
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp12
-rw-r--r--cpp/src/qpid/framing/TransferContent.h7
-rw-r--r--cpp/src/qpid/framing/Uuid.cpp12
-rw-r--r--cpp/src/qpid/framing/Uuid.h3
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
-rw-r--r--cpp/src/qpid/framing/amqp_types.h6
-rw-r--r--cpp/src/qpid/framing/variant.h3
-rw-r--r--cpp/src/qpid/log/Logger.cpp6
-rw-r--r--cpp/src/qpid/log/Logger.h5
-rw-r--r--cpp/src/qpid/log/Statement.cpp4
-rw-r--r--cpp/src/qpid/log/Statement.h24
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp3
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h90
-rw-r--r--cpp/src/qpid/sys/ScopedIncrement.h6
-rw-r--r--cpp/src/qpid/sys/Serializer.h4
-rw-r--r--cpp/src/qpid/sys/StateMonitor.h78
-rw-r--r--cpp/src/qpid/sys/Waitable.h73
-rw-r--r--cpp/src/qpid/sys/apr/APRBase.cpp1
-rw-r--r--cpp/src/qpid/sys/apr/APRBase.h8
-rw-r--r--cpp/src/qpid/sys/posix/Shlib.cpp9
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp4
-rw-r--r--cpp/src/qpid/sys/posix/check.cpp39
-rw-r--r--cpp/src/qpid/sys/posix/check.h36
-rw-r--r--cpp/src/tests/ClientChannelTest.cpp2
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp151
-rw-r--r--cpp/src/tests/ConcurrentQueue.cpp6
-rw-r--r--cpp/src/tests/EventChannelTest.cpp8
-rw-r--r--cpp/src/tests/FramingTest.cpp20
-rw-r--r--cpp/src/tests/HeadersExchangeTest.cpp4
-rw-r--r--cpp/src/tests/InProcessBroker.h215
-rw-r--r--cpp/src/tests/Makefile.am14
-rw-r--r--cpp/src/tests/QueueTest.cpp2
-rw-r--r--cpp/src/tests/ResumeHandler.cpp87
-rw-r--r--cpp/src/tests/SessionState.cpp142
-rw-r--r--cpp/src/tests/Shlib.cpp3
-rw-r--r--cpp/src/tests/TimerTest.cpp1
-rw-r--r--cpp/src/tests/TxMocks.h5
-rw-r--r--cpp/src/tests/client_test.cpp1
-rw-r--r--cpp/src/tests/echo_service.cpp1
-rw-r--r--cpp/src/tests/exception_test.cpp1
-rw-r--r--cpp/src/tests/interop_runner.cpp1
-rw-r--r--cpp/src/tests/logging.cpp52
-rw-r--r--cpp/src/tests/perftest.cpp1
-rwxr-xr-xcpp/src/tests/run-unit-tests4
-rw-r--r--cpp/src/tests/topic_listener.cpp1
-rw-r--r--cpp/src/tests/topic_publisher.cpp1
111 files changed, 1844 insertions, 1494 deletions
diff --git a/cpp/rubygen/templates/constants.rb b/cpp/rubygen/templates/constants.rb
index 2ef6502772..5fbbefe218 100755
--- a/cpp/rubygen/templates/constants.rb
+++ b/cpp/rubygen/templates/constants.rb
@@ -10,31 +10,71 @@ class ConstantsGen < CppGen
@dir="qpid/framing"
end
- def generate()
+ def constants_h()
h_file("#{@dir}/constants") {
namespace(@namespace) {
- scope("enum AmqpConstant {","};") {
- genl @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }.join(",\n")
- }
- }
- }
-
+ scope("enum AmqpConstant {","};") {
+ l=[]
+ l.concat @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }
+ @amqp.classes.each { |c|
+ l << "#{c.name.shout}_CLASS_ID=#{c.index}"
+ l.concat c.methods_.map { |m|
+ "#{c.name.shout}_#{m.name.shout}_METHOD_ID=#{m.index}" }
+ }
+ genl l.join(",\n")
+ }}}
+ end
+
+ def exbase(c)
+ case c.class_
+ when "soft-error" then "ChannelException"
+ when "hard-error" then "ConnectionException"
+ end
+ end
+
+ def reply_exceptions_h()
h_file("#{@dir}/reply_exceptions") {
include "qpid/Exception"
namespace(@namespace) {
@amqp.constants.each { |c|
- if c.class_
- exname=c.name.caps+"Exception"
- base = c.class_=="soft-error" ? "ChannelException" : "ConnectionException"
- text=(c.doc or c.name).tr_s!(" \t\n"," ")
- struct(exname, base) {
- genl "#{exname}(const std::string& msg=\"#{text})\") : #{base}(#{c.value}, msg) {}"
+ base = exbase c
+ if base
+ genl
+ struct(c.name.caps+"Exception", base) {
+ genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"#{c.name}: \"+msg) {}"
}
end
}
+ genl
+ genl "void throwReplyException(int code, const std::string& text);"
}
}
-
+ end
+
+ def reply_exceptions_cpp()
+ cpp_file("#{@dir}/reply_exceptions") {
+ include "#{@dir}/reply_exceptions"
+ include "<sstream>"
+ namespace("qpid::framing") {
+ scope("void throwReplyException(int code, const std::string& text) {"){
+ scope("switch (code) {") {
+ genl "case 200: break; // No exception"
+ @amqp.constants.each { |c|
+ if exbase c
+ genl "case #{c.value}: throw #{c.name.caps}Exception(text);"
+ end
+ }
+ scope("default:","") {
+ genl "std::ostringstream msg;"
+ genl 'msg << "Invalid reply code " << code << ": " << text;'
+ genl 'throw InvalidArgumentException(msg.str());'
+ }}}}}
+ end
+
+ def generate()
+ constants_h
+ reply_exceptions_h
+ reply_exceptions_cpp
end
end
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 57b3d27b71..ebb8916d6a 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -52,7 +52,6 @@ qpidd_SOURCES = qpidd.cpp
posix_plat_src = \
qpid/sys/epoll/EpollPoller.cpp \
- qpid/sys/posix/check.cpp \
qpid/sys/posix/Socket.cpp \
qpid/sys/posix/AsynchIO.cpp \
qpid/sys/posix/Time.cpp \
@@ -110,8 +109,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/InitiationHandler.cpp \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
- qpid/framing/ProtocolVersionException.cpp \
- qpid/framing/ResumeHandler.cpp qpid/framing/ResumeHandler.h \
+ qpid/framing/SessionState.cpp qpid/framing/SessionState.h \
qpid/framing/SendContent.cpp \
qpid/framing/SequenceNumber.cpp \
qpid/framing/SequenceNumberSet.cpp \
@@ -126,7 +124,6 @@ libqpidcommon_la_SOURCES = \
qpid/Exception.cpp \
qpid/Plugin.cpp \
qpid/Url.cpp \
- qpid/QpidError.cpp \
qpid/sys/AsynchIOAcceptor.cpp \
qpid/sys/Dispatcher.cpp \
qpid/sys/Runnable.cpp \
@@ -210,7 +207,6 @@ libqpidclient_la_SOURCES = \
qpid/client/MessageListener.cpp \
qpid/client/Correlator.cpp \
qpid/client/CompletionTracker.cpp \
- qpid/client/SessionHandler.cpp \
qpid/client/ConnectionHandler.cpp \
qpid/client/ExecutionHandler.cpp \
qpid/client/FutureCompletion.cpp \
@@ -221,12 +217,12 @@ libqpidclient_la_SOURCES = \
nobase_include_HEADERS = \
$(platform_hdr) \
+ qpid/assert.h \
qpid/Exception.h \
qpid/ExceptionHolder.h \
qpid/Msg.h \
qpid/Options.h \
qpid/Plugin.h \
- qpid/QpidError.h \
qpid/SharedObject.h \
qpid/Url.h \
qpid/memory.h \
@@ -323,7 +319,6 @@ nobase_include_HEADERS = \
qpid/client/MessageQueue.h \
qpid/client/Response.h \
qpid/client/SessionCore.h \
- qpid/client/SessionHandler.h \
qpid/client/StateManager.h \
qpid/client/TypedResult.h \
qpid/framing/AMQBody.h \
@@ -339,6 +334,7 @@ nobase_include_HEADERS = \
qpid/framing/Blob.h \
qpid/framing/BodyHandler.h \
qpid/framing/Buffer.h \
+ qpid/framing/ChannelHandler.h \
qpid/framing/ChannelAdapter.h \
qpid/framing/FieldTable.h \
qpid/framing/FieldValue.h \
@@ -360,7 +356,6 @@ nobase_include_HEADERS = \
qpid/framing/OutputHandler.h \
qpid/framing/ProtocolInitiation.h \
qpid/framing/ProtocolVersion.h \
- qpid/framing/ProtocolVersionException.h \
qpid/framing/Proxy.h \
qpid/framing/SendContent.h \
qpid/framing/SequenceNumber.h \
@@ -399,6 +394,8 @@ nobase_include_HEADERS = \
qpid/sys/Shlib.h \
qpid/sys/ShutdownHandler.h \
qpid/sys/Socket.h \
+ qpid/sys/StateMonitor.h \
+ qpid/sys/Waitable.h \
qpid/sys/Thread.h \
qpid/sys/Time.h \
qpid/sys/TimeoutHandler.h
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index 11051d1a2e..e0747df49e 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -23,6 +23,7 @@
#include "Exception.h"
#include <typeinfo>
#include <errno.h>
+#include <assert.h>
namespace qpid {
@@ -31,45 +32,22 @@ std::string strError(int err) {
return std::string(strerror_r(err, buf, sizeof(buf)));
}
-static void ctorLog(const std::exception* e) {
- QPID_LOG(trace, "Exception: " << e->what());
+Exception::Exception(const std::string& s) throw() : msg(s) {
+ QPID_LOG(warning, "Exception: " << msg);
}
-
-Exception::Exception() throw() { ctorLog(this); }
-
-Exception::Exception(const std::string& str) throw()
- : whatStr(str) { ctorLog(this); }
-
-Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); }
-
-Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
Exception::~Exception() throw() {}
-const char* Exception::what() const throw() { return whatStr.c_str(); }
-
-std::string Exception::toString() const throw() { return whatStr; }
-
-Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); }
-
-void Exception::throwSelf() const { throw *this; }
-
-ShutdownException::ShutdownException() : Exception("Shut down.") {}
-
-EmptyException::EmptyException() : Exception("Empty.") {}
-
-const char* Exception::defaultMessage = "Unexpected exception";
-
-void Exception::log(const char* what, const char* message) {
- QPID_LOG(error, message << ": " << what);
+std::string Exception::str() const throw() {
+ if (msg.empty())
+ const_cast<std::string&>(msg).assign(typeid(*this).name());
+ return msg;
}
-void Exception::log(const std::exception& e, const char* message) {
- log(e.what(), message);
-}
+const char* Exception::what() const throw() { return str().c_str(); }
-void Exception::logUnknown(const char* message) {
- log("unknown exception.", message);
+std::auto_ptr<Exception> Exception::clone() const throw() {
+ return std::auto_ptr<Exception>(new Exception(*this));
}
} // namespace qpid
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index a7ab1fa8aa..bf6c1fb872 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -24,135 +24,52 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/Msg.h"
-#include <exception>
-#include <string>
+
#include <memory>
-#include <boost/shared_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/function.hpp>
+#include <string>
namespace qpid
{
-/** Get the error message for error number err. */
+/** Get the error message for a system number err, e.g. errno. */
std::string strError(int err);
/**
- * Exception base class for all Qpid exceptions.
+ * Base class for Qpid runtime exceptions.
*/
class Exception : public std::exception
{
- protected:
- std::string whatStr;
-
public:
- typedef boost::shared_ptr<Exception> shared_ptr;
- typedef boost::shared_ptr<const Exception> shared_ptr_const;
- typedef std::auto_ptr<Exception> auto_ptr;
-
- Exception() throw();
- Exception(const std::string& str) throw();
- Exception(const char* str) throw();
- Exception(const std::exception&) throw();
-
- /** Allow any type that has ostream operator<< to act as message */
- template <class T>
- Exception(const T& message)
- : whatStr(boost::lexical_cast<std::string>(message)) {}
-
+ explicit Exception(const std::string& str=std::string()) throw();
virtual ~Exception() throw();
-
- virtual const char* what() const throw();
- virtual std::string toString() const throw();
-
- virtual auto_ptr clone() const throw();
- virtual void throwSelf() const;
-
- /** Default message: "Unknown exception" or something like it. */
- static const char* defaultMessage;
-
- /**
- * Log a message of the form "message: what"
- *@param what Exception's what() message.
- *@param message Prefix message.
- */
- static void log(const char* what, const char* message = defaultMessage);
-
- /**
- * Log an exception.
- *@param e Exception to log.
-
- */
- static void log(
- const std::exception& e, const char* message = defaultMessage);
-
- /**
- * Log an unknown exception - use in catch(...)
- *@param message Prefix message.
- */
- static void logUnknown(const char* message = defaultMessage);
-
- /**
- * Wrapper template function to call another function inside
- * try/catch and log any exception. Use boost::bind to wrap
- * member function calls or functions with arguments.
- *
- *@param f Function to call in try block.
- *@param retrhow If true the exception is rethrown.
- *@param message Prefix message.
- */
- template <class T>
- static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
- const char* message=defaultMessage)
- {
- try {
- return f();
- }
- catch (const std::exception& e) {
- log(e, message);
- if (rethrow)
- throw;
- }
- catch (...) {
- logUnknown(message);
- if (rethrow)
- throw;
- }
- }
+ virtual const char *what() const throw();
+ virtual std::auto_ptr<Exception> clone() const throw();
+ virtual std::string str() const throw();
+ private:
+ std::string msg;
};
struct ChannelException : public Exception {
- framing::ReplyCode code;
- template <class T>
- ChannelException(framing::ReplyCode code_, const T& message)
+ const framing::ReplyCode code;
+ ChannelException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
- void throwSelf() const { throw *this; }
};
struct ConnectionException : public Exception {
- framing::ReplyCode code;
- template <class T>
- ConnectionException(framing::ReplyCode code_, const T& message)
+ const framing::ReplyCode code;
+ ConnectionException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
- void throwSelf() const { throw *this; }
};
-/**
- * Exception used to indicate that a thread should shut down.
- * Does not indicate an error that should be signalled to the user.
+/** Clone an exception.
+ * For qpid::Exception this calls the clone member function.
+ * For standard exceptions, uses the copy constructor.
+ * For unknown exception types creates a std::exception
+ * with the same what() string.
*/
-struct ShutdownException : public Exception {
- ShutdownException();
- void throwSelf() const { throw *this; }
-};
-
-/** Exception to indicate empty queue or other empty state */
-struct EmptyException : public Exception {
- EmptyException();
- void throwSelf() const { throw *this; }
-};
+std::auto_ptr<std::exception> clone(const std::exception&);
-}
+} // namespace qpid
#endif /*!_Exception_*/
diff --git a/cpp/src/qpid/Msg.h b/cpp/src/qpid/Msg.h
index c1a6b54d05..7214db611f 100644
--- a/cpp/src/qpid/Msg.h
+++ b/cpp/src/qpid/Msg.h
@@ -54,7 +54,7 @@ inline std::ostream& operator<<(std::ostream& o, const Msg& m) {
}
/** Construct a message using operator << and append (file:line) */
-#define QPID_MSG(message) Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")"
+#define QPID_MSG(message) ::qpid::Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")"
} // namespace qpid
diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp
deleted file mode 100644
index 740ec24e54..0000000000
--- a/cpp/src/qpid/QpidError.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/format.hpp>
-
-#include "QpidError.h"
-#include <sstream>
-
-using namespace qpid;
-
-QpidError::QpidError() : code(0) {}
-
-QpidError::~QpidError() throw() {}
-
-Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); }
-
-void QpidError::throwSelf() const { throw *this; }
-
-std::string QpidError::message(int code, const std::string& msg, const char* file, int line) {
- return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str();
-}
-
-
diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h
deleted file mode 100644
index 2ff6571365..0000000000
--- a/cpp/src/qpid/QpidError.h
+++ /dev/null
@@ -1,79 +0,0 @@
-#ifndef __QpidError__
-#define __QpidError__
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <memory>
-#include <ostream>
-
-#include "Exception.h"
-
-namespace qpid {
-
-struct SrcLine {
- public:
- SrcLine(const std::string& file_="", int line_=0) :
- file(file_), line(line_) {}
-
- std::string file;
- int line;
-};
-
-class QpidError : public Exception
-{
- public:
- const int code;
- SrcLine loc;
- std::string msg;
-
- QpidError();
-
- template <class T>
- QpidError(int code_, const T& msg_, const SrcLine& loc_) throw()
- : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)),
- code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {}
-
- ~QpidError() throw();
- Exception::auto_ptr clone() const throw();
- void throwSelf() const;
-
- /** Format message for exception. */
- static std::string message(int code, const std::string& msg, const char* file, int line);
-};
-
-
-} // namespace qpid
-
-#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
-
-#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
-
-#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
-
-#define THROW_QPID_ERRNO_IF(cond) if (cond) QPID_ERROR(INTERNAL, strError(errno));
-
-const int PROTOCOL_ERROR = 10000;
-const int APR_ERROR = 20000;
-const int FRAMING_ERROR = 30000;
-const int CLIENT_ERROR = 40000;
-const int INTERNAL_ERROR = 50000;
-
-#endif
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e53774740a..b88f1c6c6a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -64,7 +64,8 @@ Broker::Options::Options(const std::string& name) :
storeDir("/var"),
storeAsync(false),
enableMgmt(0),
- mgmtPubInterval(10)
+ mgmtPubInterval(10),
+ ack(100)
{
addOptions()
("port,p", optValue(port,"PORT"),
@@ -102,7 +103,8 @@ Broker::Broker(const Broker::Options& conf) :
queues(store.get()),
stagingThreshold(0),
factory(*this),
- dtxManager(store.get())
+ dtxManager(store.get()),
+ sessionManager(conf.ack)
{
if(conf.enableMgmt){
managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 2018371624..817197a351 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -69,6 +69,7 @@ class Broker : public sys::Runnable, public Plugin::Target
bool storeAsync;
bool enableMgmt;
uint16_t mgmtPubInterval;
+ uint32_t ack;
};
virtual ~Broker();
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 99b585406e..dad40868d6 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -21,6 +21,7 @@
#include "MessageDelivery.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace broker {
@@ -75,8 +76,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
checkAlternate(response.first, alternate);
}
}catch(UnknownExchangeTypeException& e){
- throw ConnectionException(
- 503, "Exchange type not implemented: " + type);
+ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
}
}
@@ -84,24 +84,23 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
{
if (!type.empty() && exchange->getType() != type) {
- throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+ throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
}
}
void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
{
- if (alternate && alternate != exchange->getAlternate()) {
- throw ConnectionException(530, "Exchange declared with alternate-exchange "
- + exchange->getAlternate()->getName() + ", requested "
- + alternate->getName());
- }
-
+ if (alternate && alternate != exchange->getAlternate())
+ throw NotAllowedException(
+ QPID_MSG("Exchange declared with alternate-exchange "
+ << exchange->getAlternate()->getName() << ", requested "
+ << alternate->getName()));
}
void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
+ if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
@@ -292,7 +291,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
Queue::shared_ptr queue = state.getQueue(queueName);
if(!consumerTag.empty() && state.exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
}
string newTag = consumerTag;
//need to generate name here, so we have it for the adapter (it is
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 5537dc67f5..706b42c080 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -82,7 +82,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
throw framing::NotImplementedException("Tunnel class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
-#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ca0ca20849..f981d47ef7 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -28,9 +28,10 @@
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
-#include <boost/utility/in_place_factory.hpp>
#include <boost/bind.hpp>
+#include <algorithm>
+
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -61,6 +62,7 @@ void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
adapter.close(code, text, classId, methodId);
+ channels.clear();
getOutput().close();
}
@@ -73,8 +75,11 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed(){
+void Connection::closed(){ // Physically closed, suspend open sessions.
try {
+ std::for_each(
+ channels.begin(), channels.end(),
+ boost::bind(&SessionHandler::localSuspend, _1));
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index f697986194..dd645b595e 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -46,9 +46,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
AMQMethodBody* method=frame.getBody()->getMethod();
try{
if (!invoke(*handler.get(), *method))
- throw ConnectionException(503, "Class can't be accessed over channel 0");
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
}catch(ConnectionException& e){
- handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp
index 0bb3449289..3fcc487324 100644
--- a/cpp/src/qpid/broker/Daemon.cpp
+++ b/cpp/src/qpid/broker/Daemon.cpp
@@ -17,7 +17,7 @@
*/
#include "Daemon.h"
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include <boost/iostreams/stream.hpp>
#include <boost/iostreams/device/file_descriptor.hpp>
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 5887d13f85..ec042ff56a 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -44,7 +44,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (fail) {
state.endDtx(xid, true);
if (suspend) {
- throw ConnectionException(503, "End and suspend cannot both be set.");
+ throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
return DtxDemarcationEndResult(XA_RBROLLBACK);
}
@@ -67,7 +67,7 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
bool resume)
{
if (join && resume) {
- throw ConnectionException(503, "Join and resume cannot both be set.");
+ throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
}
try {
if (resume) {
@@ -161,7 +161,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
const string& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
- throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 0d211017de..0597b41f98 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -20,16 +20,20 @@
*/
#include "DtxManager.h"
#include "DtxTimeout.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <iostream>
using qpid::sys::Mutex;
using namespace qpid::broker;
+using namespace qpid::framing;
DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
-DtxManager::~DtxManager() {}
+DtxManager::~DtxManager() {
+ // timer.stop(); // FIXME aconway 2007-10-23: leaking threads.
+}
void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
{
@@ -84,7 +88,7 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
}
return i;
}
@@ -94,7 +98,7 @@ void DtxManager::remove(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
} else {
work.erase(i);
}
@@ -105,7 +109,7 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
- throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid);
+ throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
} else {
return work.insert(xid, new DtxWorkRecord(xid, store)).first;
}
diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h
index 33da62e7f4..7d0b8622d0 100644
--- a/cpp/src/qpid/broker/DtxTimeout.h
+++ b/cpp/src/qpid/broker/DtxTimeout.h
@@ -29,12 +29,7 @@ namespace broker {
class DtxManager;
-
-struct DtxTimeoutException : public Exception
-{
- DtxTimeoutException() {}
-};
-
+struct DtxTimeoutException : public Exception {};
struct DtxTimeout : public TimerTask
{
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index f2f118c5e4..fe9e42ca32 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -19,12 +19,14 @@
*
*/
#include "DtxWorkRecord.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
using boost::mem_fn;
using qpid::sys::Mutex;
using namespace qpid::broker;
+using namespace qpid::framing;
DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
@@ -71,8 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase)
if (prepared) {
//already prepared i.e. 2pc
if (onePhase) {
- throw ConnectionException(503,
- boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
}
store->commit(*txn);
@@ -83,8 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase)
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw ConnectionException(503,
- boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -119,7 +119,7 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
throw DtxTimeoutException();
}
if (completed) {
- throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
}
work.push_back(ops);
}
@@ -133,7 +133,7 @@ bool DtxWorkRecord::check()
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
- throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!"));
} else if ((*i)->isRollbackOnly()) {
rolledback = true;
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index ae1afe5abb..98e3cc7347 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,6 +24,7 @@
#include "HeadersExchange.h"
#include "TopicExchange.h"
#include "ManagementExchange.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -75,9 +76,8 @@ void ExchangeRegistry::destroy(const string& name){
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- throw ChannelException(404, "Exchange not found: " + name);
- }
+ if (i == exchanges.end())
+ throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
return i->second;
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 215a002517..dd688cdfcf 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -20,7 +20,7 @@
*/
#include "HeadersExchange.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
#include <algorithm>
@@ -46,9 +46,8 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
- if (!what || (*what != all && *what != any)) {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
- }
+ if (!what || (*what != all && *what != any))
+ throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
Binding binding(*args, queue);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), binding);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index b12910893a..834ce0a203 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -16,7 +16,7 @@
*
*/
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "MessageHandlerImpl.h"
#include "qpid/framing/FramingContent.h"
@@ -56,39 +56,39 @@ MessageHandlerImpl::cancel(const string& destination )
void
MessageHandlerImpl::open(const string& /*reference*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::close(const string& /*reference*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::resume(const string& /*reference*/,
const string& /*identifier*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::offset(uint64_t /*value*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
@@ -97,19 +97,19 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& /*destination*/,
bool /*noAck*/ )
{
- throw ConnectionException(540, "get no longer supported");
+ throw NotImplementedException("get no longer supported");
}
void
MessageHandlerImpl::empty()
{
- throw ConnectionException(540, "empty no longer supported");
+ throw NotImplementedException("empty no longer supported");
}
void
MessageHandlerImpl::ok()
{
- throw ConnectionException(540, "Message.Ok no longer supported");
+ throw NotImplementedException("Message.Ok no longer supported");
}
void
@@ -134,7 +134,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = state.getQueue(queueName);
if(!destination.empty() && state.exists(destination))
- throw ConnectionException(530, "Consumer tags must be unique");
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
string tag = destination;
state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
@@ -165,7 +165,7 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
state.addByteCredit(destination, value);
} else {
//unknown
- throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+ throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
}
}
@@ -179,7 +179,7 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
//window
state.setWindowMode(destination);
} else{
- throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
+ throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 116e8d9431..18c1ab1056 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -19,9 +19,8 @@
*
*/
-#include <boost/format.hpp>
-
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
@@ -37,7 +36,6 @@
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
-using boost::format;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -269,17 +267,15 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
void Queue::consume(Consumer::ptr c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
if(exclusive) {
- throw ChannelException(
- 403, format("Queue '%s' has an exclusive consumer."
- " No more consumers allowed.") % getName());
+ throw AccessRefusedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
}
if(requestExclusive) {
if(acquirers.empty() && browsers.empty()) {
exclusive = c;
} else {
- throw ChannelException(
- 403, format("Queue '%s' already has consumers."
- "Exclusive access denied.") % getName());
+ throw AccessRefusedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
}
}
if (c->preAcquires()) {
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 8535dc6a60..e1a8ae470d 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -125,7 +125,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
incoming.complete(id);
if (!invoker.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
+ throw NotImplementedException("Not implemented");
} else if (invoker.hasResult()) {
session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
}
@@ -139,7 +139,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
if (!invoke(*this, *method))
- throw ConnectionException(540, "Not implemented");
+ throw NotImplementedException("Not implemented");
}
void SemanticHandler::handleContent(AMQFrame& frame)
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1f7436da94..e0e4315d03 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -31,7 +31,6 @@
#include "SessionHandler.h"
#include "TxAck.h"
#include "TxPublish.h"
-#include "qpid/QpidError.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -116,7 +115,8 @@ void SemanticState::startTx()
void SemanticState::commit(MessageStore* const store)
{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+ if (!txBuffer) throw
+ CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
txBuffer->enlist(txAck);
@@ -127,7 +127,8 @@ void SemanticState::commit(MessageStore* const store)
void SemanticState::rollback()
{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+ if (!txBuffer)
+ throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
txBuffer->rollback();
accumulatedAck.clear();
@@ -141,7 +142,7 @@ void SemanticState::selectDtx()
void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
{
if (!dtxSelected) {
- throw ConnectionException(503, "Session has not been selected for use with dtx");
+ throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
@@ -155,11 +156,12 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
void SemanticState::endDtx(const std::string& xid, bool fail)
{
if (!dtxBuffer) {
- throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+ throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session"));
}
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end"));
+
}
txBuffer.reset();//ops on this session no longer transactional
@@ -176,8 +178,8 @@ void SemanticState::endDtx(const std::string& xid, bool fail)
void SemanticState::suspendDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend"));
}
txBuffer.reset();//ops on this session no longer transactional
@@ -188,11 +190,12 @@ void SemanticState::suspendDtx(const std::string& xid)
void SemanticState::resumeDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume"));
+
}
if (!dtxBuffer->isSuspended()) {
- throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended"));
}
checkDtxTimeout();
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index ed092d6a05..9b065be8af 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+
namespace qpid {
namespace broker {
using namespace framing;
@@ -33,7 +35,9 @@ using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
- connection(c), channel(ch), proxy(out),
+ connection(c), channel(ch, &c.getOutput()),
+ proxy(out), // Via my own handleOut() for L2 data.
+ peerSession(channel), // Direct to channel for L2 commands.
ignoring(false) {}
SessionHandler::~SessionHandler() {}
@@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
try {
if (m && invoke(*this, *m))
return;
- else if (session.get())
- session->in(f);
+ else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->in.handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ }
else if (!ignoring)
throw ChannelErrorException(
- QPID_MSG("Channel " << channel << " is not open"));
+ QPID_MSG("Channel " << channel.get() << " is not open"));
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session.reset();
- getProxy().getSession().closed(e.code, e.toString());
+ peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
void SessionHandler::handleOut(AMQFrame& f) {
- f.setChannel(getChannel());
- out.next->handle(f);
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
}
-void SessionHandler::assertOpen(const char* method) {
- if (!session.get())
+void SessionHandler::assertAttached(const char* method) const {
+ if (!session.get())
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
-void SessionHandler::assertClosed(const char* method) {
+void SessionHandler::assertClosed(const char* method) const {
if (session.get())
throw ChannelBusyException(
- QPID_MSG(method << " failed: channel " << channel
+ QPID_MSG(method << " failed: channel " << channel.get()
<< " is already open."));
}
@@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) {
std::auto_ptr<SessionState> state(
connection.broker.getSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ peerSession.attached(session->getId(), session->getTimeout());
}
void SessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(*this, id);
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ session = connection.broker.getSessionManager().resume(id);
+ session->attach(*this);
+ SequenceNumber seq = session->resuming();
+ peerSession.attached(session->getId(), session->getTimeout());
+ proxy.getSession().ack(seq, SequenceNumberSet());
}
void SessionHandler::flow(bool /*active*/) {
+ assertAttached("flow");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flow");
}
void SessionHandler::flowOk(bool /*active*/) {
+ assertAttached("flowOk");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flowOk");
}
void SessionHandler::close() {
+ assertAttached("close");
QPID_LOG(info, "Received session.close");
ignoring=false;
session.reset();
- getProxy().getSession().closed(REPLY_SUCCESS, "ok");
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ peerSession.closed(REPLY_SUCCESS, "ok");
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
@@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
session.reset();
}
+void SessionHandler::localSuspend() {
+ if (session.get() && session->getState() == SessionState::ATTACHED) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ }
+}
+
void SessionHandler::suspend() {
- assertOpen("suspend");
- connection.broker.getSessionManager().suspend(session);
- assert(!session.get());
- getProxy().getSession().detached();
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ assertAttached("suspend");
+ localSuspend();
+ peerSession.detached();
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
-void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
- const SequenceNumberSet& /*seenFrameSet*/) {
- assert(0); throw NotImplementedException();
+void SessionHandler::ack(uint32_t cumulativeSeenMark,
+ const SequenceNumberSet& /*seenFrameSet*/)
+{
+ assertAttached("ack");
+ if (session->getState() == SessionState::RESUMING) {
+ session->receivedAck(cumulativeSeenMark);
+ framing::SessionState::Replay replay=session->replay();
+ std::for_each(replay.begin(), replay.end(),
+ boost::bind(&SessionHandler::handleOut, this, _1));
+ }
+ else
+ session->receivedAck(cumulativeSeenMark);
}
void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- assert(0); throw NotImplementedException();
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.high-water-mark");
}
void SessionHandler::solicitAck() {
- assert(0); throw NotImplementedException();
+ assertAttached("solicit-ack");
+ peerSession.ack(session->sendingAck(), SequenceNumberSet());
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 51a65e3092..9a68ddb46f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -26,6 +26,7 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
#include <boost/noncopyable.hpp>
@@ -52,7 +53,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- framing::ChannelId getChannel() const { return channel; }
+ framing::ChannelId getChannel() const { return channel.get(); }
Connection& getConnection() { return connection; }
const Connection& getConnection() const { return connection; }
@@ -60,6 +61,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ // Called by closing connection.
+ void localSuspend();
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -79,12 +83,14 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
void solicitAck();
- void assertOpen(const char* method);
- void assertClosed(const char* method);
+ void assertAttached(const char* method) const;
+ void assertActive(const char* method) const;
+ void assertClosed(const char* method) const;
Connection& connection;
- const framing::ChannelId channel;
+ framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
};
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index 303687c788..f12ebc6db1 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -39,7 +39,7 @@ namespace broker {
using namespace sys;
using namespace framing;
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(uint32_t a) : ack(a) {}
SessionManager::~SessionManager() {}
@@ -47,7 +47,8 @@ std::auto_ptr<SessionState> SessionManager::open(
SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
- std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_));
+ std::auto_ptr<SessionState> session(
+ new SessionState(*this, h, timeout_, ack));
active.insert(session->getId());
return session;
}
@@ -55,14 +56,13 @@ std::auto_ptr<SessionState> SessionManager::open(
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
active.erase(session->getId());
+ session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
- session->handler = 0;
suspended.push_back(session.release()); // In expiry order
eraseExpired();
}
-std::auto_ptr<SessionState> SessionManager::resume(
- SessionHandler& sh, const Uuid& id)
+std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id)
{
Mutex::ScopedLock l(lock);
eraseExpired();
@@ -78,7 +78,6 @@ std::auto_ptr<SessionState> SessionManager::resume(
QPID_MSG("No suspended session with id=" << id));
active.insert(id);
std::auto_ptr<SessionState> state(suspended.release(i).release());
- state->handler = &sh;
return state;
}
@@ -94,8 +93,10 @@ void SessionManager::eraseExpired() {
Suspended::iterator keep = std::lower_bound(
suspended.begin(), suspended.end(), now(),
boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
+ if (suspended.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
}
}
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index 58a7b3f01f..fa7262252d 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -44,7 +44,7 @@ class SessionHandler;
*/
class SessionManager : private boost::noncopyable {
public:
- SessionManager();
+ SessionManager(uint32_t ack);
~SessionManager();
/** Open a new active session, caller takes ownership */
std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
@@ -57,18 +57,20 @@ class SessionManager : private boost::noncopyable {
/** Resume a suspended session.
*@throw Exception if timed out or non-existant.
*/
- std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&);
+ std::auto_ptr<SessionState> resume(const framing::Uuid&);
private:
typedef boost::ptr_vector<SessionState> Suspended;
typedef std::set<framing::Uuid> Active;
+ void erase(const framing::Uuid&);
+ void eraseExpired();
+
sys::Mutex lock;
Suspended suspended;
Active active;
-
- void erase(const framing::Uuid&);
- void eraseExpired();
+ uint32_t ack;
+
friend class SessionState; // removes deleted sessions from active set.
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 17537e11be..45d78c9307 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -31,22 +31,25 @@ namespace broker {
using namespace framing;
-SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_)
- : factory(f), handler(&h), id(true), timeout(timeout_),
- broker(h.getConnection().broker),
- version(h.getConnection().getVersion())
-{
- // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
- chain.push_back(new SemanticHandler(*this));
- in = &chain[0]; // Incoming frame to handler chain.
- out = &handler->out; // Outgoing frames to SessionHandler
+void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
- // FIXME aconway 2007-09-20: use broker to add plugin
- // handlers to the chain.
- // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
- broker.update(handler->getChannel(), *this);
+void SessionState::handleOut(AMQFrame& f) {
+ assert(handler);
+ handler->out.handle(f);
}
+SessionState::SessionState(
+ SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack)
+ : framing::SessionState(ack),
+ factory(f), handler(&h), id(true), timeout(timeout_),
+ broker(h.getConnection().broker),
+ version(h.getConnection().getVersion()),
+ semanticHandler(new SemanticHandler(*this))
+{
+ // FIXME aconway 2007-09-20: SessionManager may add plugin
+ // handlers to the chain.
+ }
+
SessionState::~SessionState() {
// Remove ID from active session list.
factory.erase(getId());
@@ -65,4 +68,12 @@ Connection& SessionState::getConnection() {
return getHandler().getConnection();
}
+void SessionState::detach() {
+ handler = 0;
+}
+
+void SessionState::attach(SessionHandler& h) {
+ handler = &h;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index d152937692..eed088af31 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -24,11 +24,12 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Time.h"
-#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
#include <set>
#include <vector>
@@ -42,31 +43,26 @@ class AMQP_ClientProxy;
namespace broker {
+class SemanticHandler;
class SessionHandler;
class SessionManager;
class Broker;
class Connection;
/**
- * State of a session.
- *
- * An attached session has a SessionHandler which is attached to a
- * connection. A suspended session has no handler.
- *
- * A SessionState is always associated with an open session (attached or
- * suspended) it is destroyed when the session is closed.
- *
- * The SessionState includes the sessions handler chains, which may
- * themselves have state. The handlers will be preserved as long as
- * the session is alive.
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state.
*/
-class SessionState : public framing::FrameHandler::Chains,
- private boost::noncopyable
+class SessionState : public framing::SessionState,
+ public framing::FrameHandler::InOutHandler
{
public:
~SessionState();
bool isAttached() { return handler; }
+ void detach();
+ void attach(SessionHandler& handler);
+
/** @pre isAttached() */
SessionHandler& getHandler();
@@ -76,23 +72,30 @@ class SessionState : public framing::FrameHandler::Chains,
/** @pre isAttached() */
Connection& getConnection();
- const framing::Uuid& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
+
+ protected:
+ void handleIn(framing::AMQFrame&);
+ void handleOut(framing::AMQFrame&);
private:
- /** Only SessionManager can open sessions */
- SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_);
-
+ // SessionManager creates sessions.
+ SessionState(SessionManager&,
+ SessionHandler& out,
+ uint32_t timeout,
+ uint32_t ackInterval);
+
SessionManager& factory;
SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
- boost::ptr_vector<framing::FrameHandler> chain;
framing::ProtocolVersion version;
+
+ boost::scoped_ptr<SemanticHandler> semanticHandler;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp
index be75346578..14727b3b35 100644
--- a/cpp/src/qpid/broker/Timer.cpp
+++ b/cpp/src/qpid/broker/Timer.cpp
@@ -73,17 +73,14 @@ void Timer::start()
Monitor::ScopedLock l(monitor);
if (!active) {
active = true;
- runner = std::auto_ptr<Thread>(new Thread(this));
+ runner = Thread(this);
}
}
void Timer::stop()
{
signalStop();
- if (runner.get()) {
- runner->join();
- runner.reset();
- }
+ runner.join();
}
void Timer::signalStop()
{
diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h
index c70ffeaedc..e89ae499b7 100644
--- a/cpp/src/qpid/broker/Timer.h
+++ b/cpp/src/qpid/broker/Timer.h
@@ -53,7 +53,7 @@ class Timer : private qpid::sys::Runnable
{
qpid::sys::Monitor monitor;
std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks;
- std::auto_ptr<qpid::sys::Thread> runner;
+ qpid::sys::Thread runner;
bool active;
void run();
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index cef34630db..16e0428a56 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -24,7 +24,6 @@
#include "Channel.h"
#include "qpid/sys/Monitor.h"
#include "Message.h"
-#include "qpid/QpidError.h"
#include "Connection.h"
#include "Demux.h"
#include "FutureResponse.h"
@@ -71,7 +70,7 @@ void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
+ throw ChannelBusyException();
active = true;
session = s;
if(isTransactional()) {
@@ -142,7 +141,7 @@ void Channel::consume(
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
- throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+ throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag ));
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 0a6a88ae90..932fab8881 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -29,7 +29,7 @@
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/shared_ptr.h"
#include <iostream>
#include <sstream>
#include <functional>
@@ -44,23 +44,26 @@ namespace client {
Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
channelIdCounter(0), version(_version),
max_frame_size(_max_frame_size),
- impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))),
- isOpen(false) {}
+ isOpen(false),
+ impl(new ConnectionImpl(
+ shared_ptr<Connector>(new Connector(_version, _debug))))
+{}
-Connection::Connection(boost::shared_ptr<Connector> c) :
+Connection::Connection(shared_ptr<Connector> c) :
channelIdCounter(0), version(framing::highestProtocolVersion),
max_frame_size(65536),
- impl(new ConnectionImpl(c)),
- isOpen(false) {}
+ isOpen(false),
+ impl(new ConnectionImpl(c))
+{}
-Connection::~Connection(){}
+Connection::~Connection(){ }
void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd, const std::string& vhost)
{
if (isOpen)
- THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+ throw Exception(QPID_MSG("Channel object is already open"));
impl->open(host, port, uid, pwd, vhost);
isOpen = true;
@@ -79,10 +82,9 @@ Session Connection::newSession(uint32_t detachedLifetime) {
}
void Connection::resume(Session& session) {
- shared_ptr<SessionCore> core=session.impl;
- core->setChannel(++channelIdCounter);
- impl->addSession(core);
- core->resume(impl);
+ session.impl->setChannel(++channelIdCounter);
+ impl->addSession(session.impl);
+ session.impl->resume(impl);
}
void Connection::close() {
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 2e5059f135..d2612ca754 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -23,7 +23,6 @@
*/
#include <map>
#include <string>
-#include "qpid/QpidError.h"
#include "Channel.h"
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
@@ -57,10 +56,12 @@ class Connection
framing::ChannelId channelIdCounter;
framing::ProtocolVersion version;
const uint32_t max_frame_size;
- shared_ptr<ConnectionImpl> impl;
bool isOpen;
bool debug;
-
+
+ protected:
+ boost::shared_ptr<ConnectionImpl> impl;
+
public:
/**
* Creates a connection object, but does not open the
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 4058bfb33f..a8f10c32a9 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -68,7 +68,7 @@ void ConnectionHandler::incoming(AMQFrame& frame)
try {
in(frame);
}catch(ConnectionException& e){
- error(e.code, e.toString(), body);
+ error(e.code, e.what(), body);
}catch(std::exception& e){
error(541/*internal error*/, e.what(), body);
}
@@ -124,6 +124,8 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_
void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
+ if (onError)
+ onError(code, message);
AMQMethodBody* method = body->getMethod();
if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index fae93e8294..f9273bc165 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
#include "ConnectionImpl.h"
@@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
- handler.onClose = boost::bind(&ConnectionImpl::closed, this);
- handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2);
+ handler.onClose = boost::bind(&ConnectionImpl::closed, this,
+ REPLY_SUCCESS, std::string());
+ handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
@@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s = sessions[frame.getChannel()].lock();
}
if (!s)
- throw ChannelErrorException();
+ throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
s->in(frame);
}
@@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port,
void ConnectionImpl::close()
{
- assertNotClosed();
- handler.close();
-}
-
-void ConnectionImpl::closed()
-{
- closedByPeer(200, "OK");
-}
-
-void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
-{
- signalClose(code, text);
- connector->close();
+ if (!isClosed)
+ handler.close();
}
void ConnectionImpl::idleIn()
@@ -110,26 +101,39 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
+template <class F>
+void ConnectionImpl::forChannels(F functor) {
+ for (SessionMap::iterator i = sessions.begin();
+ i != sessions.end(); ++i) {
+ try {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) functor(*s);
+ } catch (...) { assert(0); }
+ }
+}
+
void ConnectionImpl::shutdown()
{
- //this indicates that the socket to the server has closed
- signalClose(0, "Unexpected socket closure.");
+ Mutex::ScopedLock l(lock);
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionBroke, _1,
+ INTERNAL_ERROR, "Unexpected socket closure."));
+ sessions.clear();
+ isClosed = true;
}
-void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s)
- s->closed(code, text);
- }
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
sessions.clear();
isClosed = true;
+ connector->close();
}
-void ConnectionImpl::assertNotClosed()
-{
+void ConnectionImpl::erase(uint16_t ch) {
Mutex::ScopedLock l(lock);
- if (isClosed) throw Exception("Connection has been closed");
+ sessions.erase(ch);
}
+
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index f20534f1aa..46bd5b685d 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -51,14 +51,14 @@ class ConnectionImpl : public framing::FrameHandler,
bool isClosed;
void incoming(framing::AMQFrame& frame);
- void closed();
- void closedByPeer(uint16_t, const std::string&);
+ void closed(uint16_t, const std::string&);
void idleOut();
void idleIn();
void shutdown();
- void signalClose(uint16_t, const std::string&);
- void assertNotClosed();
-public:
+
+ template <class F> void forChannels(F functor);
+
+ public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
@@ -69,7 +69,9 @@ public:
const std::string& pwd = "guest",
const std::string& virtualhost = "/");
void close();
- void handle(framing::AMQFrame& frame);
+ void handle(framing::AMQFrame& frame);
+ void erase(uint16_t channel);
+ boost::shared_ptr<Connector> getConnector() { return connector; }
};
}}
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index b1ec580605..ba11ea5569 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -20,7 +20,6 @@
*/
#include <iostream>
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
@@ -36,7 +35,6 @@ namespace client {
using namespace qpid::sys;
using namespace qpid::framing;
-using qpid::QpidError;
Connector::Connector(
ProtocolVersion ver, bool _debug, uint32_t buffer_size
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 8aaaea247a..af6badd6e0 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -98,6 +98,7 @@ class Connector : public framing::OutputHandler,
virtual void setInputHandler(framing::InputHandler* handler);
virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+ virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
virtual framing::OutputHandler* getOutputHandler();
virtual void send(framing::AMQFrame& frame);
virtual void setReadTimeout(uint16_t timeout);
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index e4edece414..c70b0fc455 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -73,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
if (range.size() % 2) { //must be even number
- throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark"));
} else {
SequenceNumber mark(cumulative);
{
diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h
index 667a19e942..d07f9f149c 100644
--- a/cpp/src/qpid/client/Future.h
+++ b/cpp/src/qpid/client/Future.h
@@ -63,7 +63,7 @@ public:
boost::bind(&FutureCompletion::completed, &callback)
);
callback.waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
complete = true;
}
}
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index 73b7c3a7a6..5d36a1d873 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -31,7 +31,7 @@ using namespace qpid::sys;
AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
{
waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
return response.get();
}
diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp
index a523129206..681202edea 100644
--- a/cpp/src/qpid/client/FutureResult.cpp
+++ b/cpp/src/qpid/client/FutureResult.cpp
@@ -30,7 +30,7 @@ using namespace qpid::sys;
const std::string& FutureResult::getResult(SessionCore& session) const
{
waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
return result;
}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 966d07eaef..27440465fe 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -24,105 +24,301 @@
#include "FutureResponse.h"
#include "FutureResult.h"
#include "ConnectionImpl.h"
-
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-using namespace qpid::client;
+namespace qpid {
+namespace client {
+
using namespace qpid::framing;
-SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t maxFrameSize)
- : connection(conn), channel(ch), l2(*this), l3(maxFrameSize),
- uuid(false), sync(false)
+namespace { const std::string OK="ok"; }
+
+typedef sys::Monitor::ScopedLock Lock;
+typedef sys::Monitor::ScopedUnlock UnLock;
+
+inline void SessionCore::invariant() const {
+ switch (state.get()) {
+ case OPENING:
+ assert(!session);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case RESUMING:
+ assert(session);
+ assert(session->getState() == SessionState::RESUMING);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case OPEN:
+ case CLOSING:
+ case SUSPENDING:
+ assert(session);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case SUSPENDED:
+ assert(code==REPLY_SUCCESS);
+ assert(session);
+ assert(!connection);
+ break;
+ case CLOSED:
+ assert(!session);
+ assert(!connection);
+ break;
+ }
+}
+
+inline void SessionCore::setState(State s) {
+ state = s;
+ invariant();
+}
+
+inline void SessionCore::waitFor(State s) {
+ invariant();
+ // We can be CLOSED or SUSPENDED by error at any time.
+ state.waitFor(States(s, CLOSED, SUSPENDED));
+ check();
+ assert(state==s);
+ invariant();
+}
+
+SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn,
+ uint16_t ch, uint64_t maxFrameSize)
+ : l3(maxFrameSize),
+ sync(false),
+ channel(ch),
+ proxy(channel),
+ state(OPENING)
{
- l2.next = &l3;
l3.out = &out;
- out.next = connection.get();
+ attaching(conn);
}
-SessionCore::~SessionCore() {}
+void SessionCore::attaching(shared_ptr<ConnectionImpl> c) {
+ assert(c);
+ assert(channel.get());
+ connection = c;
+ channel.next = connection.get();
+ code = REPLY_SUCCESS;
+ text = OK;
+ state = session ? RESUMING : OPENING;
+ invariant();
+}
-ExecutionHandler& SessionCore::getExecution()
-{
- checkClosed();
- return l3;
+SessionCore::~SessionCore() {
+ Lock l(state);
+ invariant();
+ detach(COMMAND_INVALID, "Session deleted");
+ state.waitAll();
}
-FrameSet::shared_ptr SessionCore::get()
-{
- checkClosed();
- return l3.getDemux().getDefault().pop();
+void SessionCore::detach(int c, const std::string& t) {
+ connection.reset();
+ channel.next = 0;
+ code=c;
+ text=t;
}
-void SessionCore::setSync(bool s)
-{
+void SessionCore::doClose(int code, const std::string& text) {
+ if (state != CLOSED) {
+ session.reset();
+ l3.getDemux().close();
+ l3.getCompletionTracker().close();
+ detach(code, text);
+ setState(CLOSED);
+ }
+ invariant();
+}
+
+void SessionCore::doSuspend(int code, const std::string& text) {
+ if (state != CLOSED) {
+ invariant();
+ detach(code, text);
+ session->suspend();
+ setState(SUSPENDED);
+ }
+}
+
+ExecutionHandler& SessionCore::getExecution() { // user thread
+ return l3;
+}
+
+void SessionCore::setSync(bool s) { // user thread
sync = s;
}
-bool SessionCore::isSync()
-{
+bool SessionCore::isSync() { // user thread
return sync;
}
-namespace {
-struct ClosedOnExit {
- SessionCore& core;
- int code;
- std::string text;
- ClosedOnExit(SessionCore& s, int c, const std::string& t)
- : core(s), code(c), text(t) {}
- ~ClosedOnExit() { core.closed(code, text); }
-};
+FrameSet::shared_ptr SessionCore::get() { // user thread
+ // No lock here: pop does a blocking wait.
+ return l3.getDemux().getDefault().pop();
+}
+
+void SessionCore::open(uint32_t detachedLifetime) { // user thread
+ Lock l(state);
+ check(state==OPENING && !session,
+ COMMAND_INVALID, QPID_MSG("Cannot re-open a session."));
+ proxy.open(detachedLifetime);
+ waitFor(OPEN);
+}
+
+void SessionCore::close() { // user thread
+ Lock l(state);
+ check();
+ if (state==OPEN) {
+ setState(CLOSING);
+ proxy.close();
+ waitFor(CLOSED);
+ }
+ else
+ doClose(REPLY_SUCCESS, OK);
+}
+
+void SessionCore::suspend() { // user thread
+ Lock l(state);
+ checkOpen();
+ setState(SUSPENDING);
+ proxy.suspend();
+ waitFor(SUSPENDED);
}
-void SessionCore::close()
+void SessionCore::setChannel(uint16_t ch) { channel=ch; }
+
+void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
+ // user thread
+ {
+ Lock l(state);
+ if (state==OPEN)
+ doSuspend(REPLY_SUCCESS, OK);
+ check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed."));
+ SequenceNumber sendAck=session->resuming();
+ attaching(c);
+ proxy.resume(getId());
+ waitFor(OPEN);
+ proxy.ack(sendAck, SequenceNumberSet());
+ // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem
+ // for large replay sets.
+ SessionState::Replay replay=session->replay();
+ for (SessionState::Replay::iterator i = replay.begin();
+ i != replay.end(); ++i)
+ {
+ invariant();
+ channel.handle(*i); // Direct to channel.
+ check();
+ }
+ }
+}
+
+void SessionCore::assertOpen() const {
+ Lock l(state);
+ checkOpen();
+}
+
+// network thread
+void SessionCore::attached(const Uuid& sessionId,
+ uint32_t /*detachedLifetime*/)
{
- checkClosed();
- ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user.");
- l2.close();
+ Lock l(state);
+ invariant();
+ check(state == OPENING || state == RESUMING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.attached"));
+ if (state==OPENING) { // New session
+ // FIXME aconway 2007-10-17: arbitrary ack value of 100 for
+ // client, allow configuration.
+ session=in_place<SessionState>(100, sessionId);
+ setState(OPEN);
+ }
+ else { // RESUMING
+ check(sessionId == session->getId(),
+ INVALID_ARGUMENT, QPID_MSG("session.resumed has invalid ID."));
+ // Don't setState yet, wait for first incoming ack.
+ }
}
-void SessionCore::suspend() {
- checkClosed();
- ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended");
- l2.suspend();
+void SessionCore::detached() { // network thread
+ Lock l(state);
+ check(state == SUSPENDING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.detached."));
+ connection->erase(channel);
+ doSuspend(REPLY_SUCCESS, OK);
+}
+
+void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) {
+ Lock l(state);
+ invariant();
+ check(state==OPEN || state==RESUMING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.ack"));
+ session->receivedAck(ack);
+ if (state==RESUMING) {
+ setState(OPEN);
+ }
+ invariant();
}
void SessionCore::closed(uint16_t code, const std::string& text)
-{
- out.next = 0;
- reason.code = code;
- reason.text = text;
- l2.closed();
- l3.getDemux().close();
- l3.getCompletionTracker().close();
+{ // network thread
+ Lock l(state);
+ invariant();
+ doClose(code, text);
}
-void SessionCore::checkClosed() const
-{
- // TODO: could have been a connection exception
- if(out.next == 0)
- throw ChannelException(reason.code, reason.text);
+// closed by connection
+void SessionCore::connectionClosed(uint16_t code, const std::string& text) {
+ Lock l(state);
+ try {
+ doClose(code, text);
+ } catch(...) { assert (0); }
}
-void SessionCore::open(uint32_t detachedLifetime) {
- assert(out.next);
- l2.open(detachedLifetime);
+void SessionCore::connectionBroke(uint16_t code, const std::string& text) {
+ Lock l(state);
+ try {
+ doSuspend(code, text);
+ } catch (...) { assert(0); }
}
-void SessionCore::resume(shared_ptr<ConnectionImpl> conn) {
- connection = conn;
- out.next = connection.get();
- l2.resume();
+void SessionCore::check() const { // Called with lock held.
+ invariant();
+ if (code != REPLY_SUCCESS)
+ throwReplyException(code, text);
+}
+
+void SessionCore::check(bool cond, int newCode, const std::string& msg) const {
+ check();
+ if (!cond) {
+ const_cast<SessionCore*>(this)->doClose(newCode, msg);
+ throwReplyException(code, text);
+ }
}
-Future SessionCore::send(const AMQBody& command)
-{
- checkClosed();
+void SessionCore::checkOpen() const {
+ if (state==SUSPENDED) {
+ std::string cause;
+ if (code != REPLY_SUCCESS)
+ cause=" by :"+text;
+ throw CommandInvalidException(QPID_MSG("Session is suspended" << cause));
+ }
+ check(state==OPEN, COMMAND_INVALID, QPID_MSG("Session is not open"));
+}
+Future SessionCore::send(const AMQBody& command)
+{
+ Lock l(state);
+ checkOpen();
command.getMethod()->setSync(sync);
-
Future f;
//any result/response listeners must be set before the command is sent
if (command.getMethod()->resultExpected()) {
@@ -145,21 +341,61 @@ Future SessionCore::send(const AMQBody& command)
Future SessionCore::send(const AMQBody& command, const MethodContent& content)
{
- checkClosed();
+ Lock l(state);
+ checkOpen();
//content bearing methods don't currently have responses or
//results, if that changes should follow procedure for the other
//send method impl:
return Future(l3.send(command, content));
}
+// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
- l2.handle(frame);
+ try {
+ // Cast to expose private SessionHandler functions.
+ if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ session->received(frame);
+ l3.handle(frame);
+ }
+ } catch (const ChannelException& e) {
+ QPID_LOG(error, "Channel exception:" << e.what());
+ doClose(e.code, e.what());
+ }
}
void SessionCore::handleOut(AMQFrame& frame)
{
- checkClosed();
- frame.setChannel(channel);
- out.next->handle(frame);
+ Lock l(state);
+ if (state==OPEN) {
+ if (session->sent(frame))
+ proxy.solicitAck();
+ channel.handle(frame);
+ }
+}
+
+void SessionCore::solicitAck( ) {
+ Lock l(state);
+ checkOpen();
+ proxy.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void SessionCore::flow(bool) {
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void SessionCore::flowOk(bool /*active*/) {
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) {
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.highWaterMark");
+}
+
+const Uuid SessionCore::getId() const {
+ if (session)
+ return session->getId();
+ throw Exception(QPID_MSG("Closed session, no ID."));
}
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index ac109e1f5c..38c72359a3 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -22,17 +22,25 @@
#ifndef _SessionCore_
#define _SessionCore_
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
-#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/shared_ptr.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/Uuid.h"
-#include "SessionHandler.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/StateMonitor.h"
#include "ExecutionHandler.h"
+#include <boost/optional.hpp>
+
namespace qpid {
+namespace framing {
+class FrameSet;
+class MethodContent;
+class SequenceNumberSet;
+}
+
namespace client {
class Future;
@@ -43,60 +51,90 @@ class ConnectionImpl;
* Attaches to a SessionHandler when active, detaches
* when closed.
*/
-class SessionCore : public framing::FrameHandler::InOutHandler
+class SessionCore : public framing::FrameHandler::InOutHandler,
+ private framing::AMQP_ClientOperations::SessionHandler
{
- struct Reason
- {
- uint16_t code;
- std::string text;
- };
-
- shared_ptr<ConnectionImpl> connection;
- uint16_t channel;
- SessionHandler l2;
- ExecutionHandler l3;
- framing::Uuid uuid;
- volatile bool sync;
- Reason reason;
-
- protected:
- void handleIn(framing::AMQFrame& frame);
- void handleOut(framing::AMQFrame& frame);
-
public:
SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
~SessionCore();
framing::FrameSet::shared_ptr get();
+ const framing::Uuid getId() const;
+ uint16_t getChannel() const { return channel; }
+ void assertOpen() const;
- framing::Uuid getId() const { return uuid; }
- void setId(const framing::Uuid& id) { uuid= id; }
-
- uint16_t getChannel() const { assert(channel); return channel; }
- void setChannel(uint16_t ch) { assert(ch); channel=ch; }
-
+ // NOTE: Public functions called in user thread.
void open(uint32_t detachedLifetime);
-
- /** Closed by client code */
void close();
-
- /** Closed by peer */
- void closed(uint16_t code, const std::string& text);
-
void resume(shared_ptr<ConnectionImpl>);
void suspend();
+ void setChannel(uint16_t channel);
- void setSync(bool);
+ void setSync(bool s);
bool isSync();
ExecutionHandler& getExecution();
- void checkClosed() const;
Future send(const framing::AMQBody& command);
+
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-};
-}
-}
+ void connectionClosed(uint16_t code, const std::string& text);
+ void connectionBroke(uint16_t code, const std::string& text);
+
+ private:
+ enum State {
+ OPENING,
+ RESUMING,
+ OPEN,
+ CLOSING,
+ SUSPENDING,
+ SUSPENDED,
+ CLOSED
+ };
+ typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
+ typedef sys::StateMonitor<State, CLOSED> StateMonitor;
+ typedef StateMonitor::Set States;
+
+ inline void invariant() const;
+ inline void setState(State s);
+ inline void waitFor(State);
+ void doClose(int code, const std::string& text);
+ void doSuspend(int code, const std::string& text);
+
+ /** If there is an error, throw the exception */
+ void check(bool condition, int code, const std::string& text) const;
+ /** Throw if *error */
+ void check() const;
+
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+
+ // Private functions are called by broker in network thread.
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void detached();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+ void closed(uint16_t code, const std::string& text);
+
+ void attaching(shared_ptr<ConnectionImpl>);
+ void detach(int code, const std::string& text);
+ void checkOpen() const;
+
+ int code; // Error code
+ std::string text; // Error text
+ boost::optional<framing::SessionState> session;
+ shared_ptr<ConnectionImpl> connection;
+ ExecutionHandler l3;
+ volatile bool sync;
+ framing::ChannelHandler channel;
+ framing::AMQP_ServerProxy::Session proxy;
+ mutable StateMonitor state;
+};
+}} // namespace qpid::client
#endif
diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp
deleted file mode 100644
index d3b04e5356..0000000000
--- a/cpp/src/qpid/client/SessionHandler.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SessionHandler.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/client/SessionCore.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace boost;
-
-namespace {
-// TODO aconway 2007-09-28: hack till we have multi-version support.
-ProtocolVersion version;
-}
-
-SessionHandler::SessionHandler(SessionCore& parent)
- : StateManager(CLOSED), core(parent) {}
-
-SessionHandler::~SessionHandler() {}
-
-void SessionHandler::handle(AMQFrame& frame)
-{
- AMQBody* body = frame.getBody();
- if (getState() == OPEN) {
- core.checkClosed();
- SessionClosedBody* closedBody=
- dynamic_cast<SessionClosedBody*>(body->getMethod());
- if (closedBody) {
- closed();
- core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
- } else {
- try {
- next->handle(frame);
- }
- catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception:" << e.what());
- closed();
- AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
- core.out(f);
- core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
- }
- }
- } else {
- if (body->getMethod())
- handleMethod(body->getMethod());
- else
- throw ConnectionException(504, "Channel not open for content.");
- }
-}
-
-void SessionHandler::attach(const AMQMethodBody& command)
-{
- setState(OPENING);
- AMQFrame f(0, command);
- core.out(f);
- std::set<int> states;
- states.insert(OPEN);
- states.insert(CLOSED);
- waitFor(states);
- if (getState() != OPEN)
- throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
-}
-
-void SessionHandler::open(uint32_t detachedLifetime) {
- attach(SessionOpenBody(version, detachedLifetime));
-}
-
-void SessionHandler::resume() {
- attach(SessionResumeBody(version, core.getId()));
-}
-
-void SessionHandler::detach(const AMQMethodBody& command)
-{
- setState(CLOSING);
- AMQFrame f(0, command);
- core.out(f);
- waitFor(CLOSED);
-}
-
-void SessionHandler::close() { detach(SessionCloseBody(version)); }
-void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
-void SessionHandler::closed() { setState(CLOSED); }
-
-void SessionHandler::handleMethod(AMQMethodBody* method)
-{
- switch (getState()) {
- case OPENING: {
- SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
- if (attached) {
- core.setId(attached->getSessionId());
- setState(OPEN);
- } else
- throw ChannelErrorException();
- break;
- }
- case CLOSING:
- if (method->isA<SessionClosedBody>() ||
- method->isA<SessionDetachedBody>())
- closed();
- break;
-
- case CLOSED:
- throw ChannelErrorException();
-
- default:
- assert(0);
- throw InternalErrorException(QPID_MSG("Internal Error."));
- }
-}
-
diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h
deleted file mode 100644
index 994b8402de..0000000000
--- a/cpp/src/qpid/client/SessionHandler.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _SessionHandler_
-#define _SessionHandler_
-
-#include "StateManager.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/shared_ptr.h"
-
-namespace qpid {
-namespace client {
-class SessionCore;
-
-/**
- * Handles incoming session (L2) commands.
- */
-class SessionHandler : public framing::FrameHandler,
- private StateManager
-{
- enum STATES {OPENING, OPEN, CLOSING, CLOSED};
- SessionCore& core;
-
- void handleMethod(framing::AMQMethodBody* method);
- void attach(const framing::AMQMethodBody&);
- void detach(const framing::AMQMethodBody&);
-
- public:
- SessionHandler(SessionCore& parent);
- ~SessionHandler();
-
- /** Incoming from broker */
- void handle(framing::AMQFrame&);
-
- void open(uint32_t detachedLifetime);
- void resume();
- void close();
- void closed();
- void suspend();
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp
index b72967c098..0cb3c6b9d4 100644
--- a/cpp/src/qpid/client/StateManager.cpp
+++ b/cpp/src/qpid/client/StateManager.cpp
@@ -60,7 +60,7 @@ void StateManager::setState(int s)
stateLock.notifyAll();
}
-int StateManager::getState()
+int StateManager::getState() const
{
Monitor::ScopedLock l(stateLock);
return state;
diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h
index fd0c1b7f86..2f8ecb772c 100644
--- a/cpp/src/qpid/client/StateManager.h
+++ b/cpp/src/qpid/client/StateManager.h
@@ -30,12 +30,12 @@ namespace client {
class StateManager
{
int state;
- sys::Monitor stateLock;
+ mutable sys::Monitor stateLock;
public:
StateManager(int initial);
void setState(int state);
- int getState();
+ int getState() const ;
void waitForStateChange(int current);
void waitFor(std::set<int> states);
void waitFor(int state);
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index abd33c4158..423af06173 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -20,9 +20,9 @@
*/
#include "AMQFrame.h"
-#include "qpid/QpidError.h"
#include "qpid/framing/variant.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
@@ -103,7 +103,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t flags = buffer.getOctet();
uint8_t framing_version = (flags & 0xc0) >> 6;
if (framing_version != 0)
- THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported");
+ throw SyntaxErrorException(QPID_MSG("Framing version unsupported"));
bof = flags & 0x08;
eof = flags & 0x04;
bos = flags & 0x02;
@@ -111,7 +111,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t type = buffer.getOctet();
uint16_t frame_size = buffer.getShort();
if (frame_size < frameOverhead()-1)
- THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small");
+ throw SyntaxErrorException(QPID_MSG("Frame size too small"));
uint8_t reserved1 = buffer.getOctet();
uint8_t field1 = buffer.getOctet();
subchannel = field1 & 0x0f;
@@ -121,7 +121,7 @@ bool AMQFrame::decode(Buffer& buffer)
// Verify that the protocol header meets current spec
// TODO: should we check reserved2 against zero as well? - the spec isn't clear
if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
- THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero");
+ throw SyntaxErrorException(QPID_MSG("Reserved bits not zero"));
// TODO: should no longer care about body size and only pass up B,E,b,e flags
uint16_t body_size = frame_size + 1 - frameOverhead();
@@ -133,7 +133,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t end = buffer.getOctet();
if (end != 0xCE)
- THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+ throw SyntaxErrorException(QPID_MSG("Frame end not found"));
return true;
}
@@ -147,9 +147,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break;
default:
- THROW_QPID_ERROR(
- FRAMING_ERROR,
- boost::format("Unknown frame type %d") % type);
+ throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type));
}
boost::apply_visitor(DecodeVisitor(buffer,size), body);
}
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index 53a01141c1..fb84be7cd6 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -18,14 +18,13 @@
* under the License.
*
*/
-#include "qpid/QpidError.h"
#include "BodyHandler.h"
#include "AMQMethodBody.h"
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-
#include <boost/cast.hpp>
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::framing;
using namespace boost;
@@ -49,7 +48,8 @@ void BodyHandler::handleBody(AMQBody* body) {
handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
- QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
+ throw SyntaxErrorException(
+ QPID_MSG("Invalid frame type " << body->type()));
}
}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 6a466fdfab..8c1a4e1e9e 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -15,8 +15,6 @@
* limitations under the License.
*
*/
-#include <boost/format.hpp>
-
#include "ChannelAdapter.h"
#include "OutputHandler.h"
#include "AMQFrame.h"
@@ -26,8 +24,6 @@
#include "AMQMethodBody.h"
#include "qpid/framing/ConnectionOpenBody.h"
-using boost::format;
-
namespace qpid {
namespace framing {
@@ -53,20 +49,20 @@ void ChannelAdapter::send(const AMQBody& body)
void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID)
- throw ConnectionException(
- 504, format("Connection method on non-0 channel %d.")%getId());
+ throw ChannelErrorException(
+ QPID_MSG("Connection method on non-0 channel " << getId()));
}
void ChannelAdapter::assertChannelOpen() const {
if (getId() != 0 && !isOpen())
- throw ConnectionException(
- 504, format("Channel %d is not open.")%getId());
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << getId() << " is not open."));
}
void ChannelAdapter::assertChannelNotOpen() const {
if (getId() != 0 && isOpen())
- throw ConnectionException(
- 504, format("Channel %d is already open.") % getId());
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << getId() << " is already open."));
}
void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); }
diff --git a/cpp/src/qpid/framing/ProtocolVersionException.h b/cpp/src/qpid/framing/ChannelHandler.h
index bd16804470..69aaeac492 100644
--- a/cpp/src/qpid/framing/ProtocolVersionException.h
+++ b/cpp/src/qpid/framing/ChannelHandler.h
@@ -1,3 +1,6 @@
+#ifndef QPID_FRAMING_CHANNELHANDLER_H
+#define QPID_FRAMING_CHANNELHANDLER_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,39 +21,33 @@
* under the License.
*
*/
-
-#ifndef _ProtocolVersionException_
-#define _ProtocolVersionException_
-
-#include "qpid/Exception.h"
-#include "ProtocolVersion.h"
-#include <string>
-#include <vector>
+#include "FrameHandler.h"
+#include "AMQFrame.h"
namespace qpid {
namespace framing {
-class ProtocolVersionException : public qpid::Exception
+/**
+ * Sets the channel number on outgoing frames.
+ */
+class ChannelHandler : public FrameHandler
{
-protected:
- ProtocolVersion versionFound;
-
-public:
- ~ProtocolVersionException() throw() {}
-
- template <class T>
- ProtocolVersionException(
- ProtocolVersion ver, const T& msg) throw () : versionFound(ver)
- { init(boost::lexical_cast<std::string>(msg)); }
-
- template <class T>
- ProtocolVersionException(const T& msg) throw ()
- { init(boost::lexical_cast<std::string>(msg)); }
+ public:
+ ChannelHandler(uint16_t channelId=0, FrameHandler* next=0)
+ : FrameHandler(next), channel(channelId) {}
+ void handle(AMQFrame& frame) {
+ frame.setChannel(channel);
+ next->handle(frame);
+ }
+ uint16_t get() const { return channel; }
+ ChannelHandler& set(uint16_t ch) { channel=ch; return *this; }
+ operator uint16_t() const { return get(); }
+ ChannelHandler& operator=(uint16_t ch) { return set(ch); }
private:
- void init(const std::string& msg);
+ uint16_t channel;
};
}} // namespace qpid::framing
-#endif //ifndef _ProtocolVersionException_
+#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index 3c0284f2c8..089bc5d4a5 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -19,9 +19,10 @@
*
*/
#include "FieldTable.h"
-#include "qpid/QpidError.h"
#include "Buffer.h"
#include "FieldValue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
#include <assert.h>
namespace qpid {
@@ -132,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){
uint32_t len = buffer.getLong();
uint32_t available = buffer.available();
if (available < len)
- THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table.");
+ throw SyntaxErrorException(QPID_MSG("Not enough data for field table."));
uint32_t leftover = available - len;
while(buffer.available() > leftover){
std::string name;
diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp
index a7535ae4b9..5526c9cb72 100644
--- a/cpp/src/qpid/framing/FieldValue.cpp
+++ b/cpp/src/qpid/framing/FieldValue.cpp
@@ -20,8 +20,7 @@
*/
#include "FieldValue.h"
#include "Buffer.h"
-#include "qpid/QpidError.h"
-
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -75,9 +74,7 @@ void FieldValue::decode(Buffer& buffer)
data.reset(new FixedWidthValue<0>());
break;
default:
- std::stringstream out;
- out << "Unknown field table value type: " << typeOctet;
- THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
}
data->decode(buffer);
}
diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp
index 813e6fb49b..cd134b0e89 100644
--- a/cpp/src/qpid/framing/FramingContent.cpp
+++ b/cpp/src/qpid/framing/FramingContent.cpp
@@ -18,12 +18,10 @@
* under the License.
*
*/
-#include <assert.h>
-
#include "Buffer.h"
#include "FramingContent.h"
-#include "qpid/QpidError.h"
-#include <sstream>
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -37,12 +35,12 @@ Content::Content(uint8_t _discriminator, const string& _value): discriminator(_d
void Content::validate() {
if (discriminator == REFERENCE) {
if(value.empty()) {
- THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty");
+ throw InvalidArgumentException(
+ QPID_MSG("Reference cannot be empty"));
}
}else if (discriminator != INLINE) {
- std::stringstream out;
- out << "Invalid discriminator: " << (int) discriminator;
- THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ throw SyntaxErrorException(
+ QPID_MSG("Invalid discriminator: " << discriminator));
}
}
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 3e55dff1bd..fbf3c0b7ca 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -33,6 +33,7 @@ template <class T>
struct Handler {
typedef T HandledType;
typedef void handleFptr(T);
+ typedef void result_type; // Compatible with std/boost functors.
Handler(Handler<T>* next_=0) : next(next_) {}
virtual ~Handler() {}
@@ -51,7 +52,7 @@ struct Handler {
struct Chain : public Handler<T> {
Chain(Handler<T>* first=0) : Handler(first) {}
void operator=(Handler<T>* h) { next = h; }
- void handle(T t) { (*next)(t); }
+ void handle(T t) { next->handle(t); }
// TODO aconway 2007-08-29: chain modifier ops here.
};
diff --git a/cpp/src/qpid/framing/ProtocolVersionException.cpp b/cpp/src/qpid/framing/ProtocolVersionException.cpp
deleted file mode 100644
index b68b3af1f9..0000000000
--- a/cpp/src/qpid/framing/ProtocolVersionException.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/format.hpp>
-#include "ProtocolVersionException.h"
-
-
-using namespace qpid::framing;
-
-void ProtocolVersionException::init(const std::string& msg)
-{
- whatStr = boost::str(
- boost::format("ProtocolVersionException: %s found: %s")
- % versionFound.toString() % msg);
-}
-
diff --git a/cpp/src/qpid/framing/ResumeHandler.cpp b/cpp/src/qpid/framing/ResumeHandler.cpp
deleted file mode 100644
index 9d2c971459..0000000000
--- a/cpp/src/qpid/framing/ResumeHandler.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ResumeHandler.h"
-#include "qpid/framing/reply_exceptions.h"
-
-#include <boost/bind.hpp>
-
-#include <algorithm>
-
-namespace qpid {
-namespace framing {
-
-void ResumeHandler::ackReceived(SequenceNumber acked) {
- if (lastSent < acked)
- throw InvalidArgumentException("Invalid sequence number in ack");
- size_t keep = lastSent - acked;
- if (keep < unacked.size())
- unacked.erase(unacked.begin(), unacked.end()-keep);
-}
-
-void ResumeHandler::resend() {
- std::for_each(unacked.begin(), unacked.end(),
- boost::bind(&FrameHandler::handle,out->next, _1));
-}
-
-void ResumeHandler::handleIn(AMQFrame& f) {
- ++lastReceived;
- in.next->handle(f);
-}
-
-void ResumeHandler::handleOut(AMQFrame& f) {
- ++lastSent;
- unacked.push_back(f);
- out.next->handle(f);
-}
-
-
-}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/ResumeHandler.h b/cpp/src/qpid/framing/ResumeHandler.h
deleted file mode 100644
index c86a60b9cb..0000000000
--- a/cpp/src/qpid/framing/ResumeHandler.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef QPID_FRAMING_RESUMEHANDLER_H
-#define QPID_FRAMING_RESUMEHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-#include <deque>
-
-namespace qpid {
-namespace framing {
-
-/**
- * In/out handler pair for managing exactly-once session delivery.
- * The same handler is used by client and broker.
- * This handler only deals with TCP style SequenceNumber acks,
- * not with fragmented SequenceNumberSet.
- *
- * THREAD UNSAFE. Expected to be used in a serialized context.
- */
-class ResumeHandler : public FrameHandler::InOutHandler
-{
- public:
- /** Received acknowledgement for sent frames up to and including sentOk */
- void ackReceived(SequenceNumber sentOk);
-
- /** What was the last sequence number we received. */
- SequenceNumber getLastReceived() { return lastReceived; }
-
- /** Resend the unacked frames to the output handler */
- void resend();
-
- protected:
- void handleIn(AMQFrame&);
- void handleOut(AMQFrame&);
-
- private:
- typedef std::deque<AMQFrame> Frames;
- Frames unacked;
- SequenceNumber lastReceived;
- SequenceNumber lastSent;
-};
-
-
-}} // namespace qpid::common
-
-
-#endif /*!QPID_FRAMING_RESUMEHANDLER_H*/
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index 9b8f0659b2..3aee04a4ce 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -47,6 +47,7 @@ class SequenceNumber
bool operator<=(const SequenceNumber& other) const;
bool operator>=(const SequenceNumber& other) const;
uint32_t getValue() const { return (uint32_t) value; }
+ operator uint32_t() const { return (uint32_t) value; }
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
};
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
new file mode 100644
index 0000000000..045a0ae115
--- /dev/null
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+#include <boost/bind.hpp>
+#include <boost/none.hpp>
+
+namespace qpid {
+namespace framing {
+
+SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
+ state(ATTACHED),
+ id(uuid),
+ lastReceived(-1),
+ lastSent(-1),
+ ackInterval(ack),
+ sendAckAt(lastReceived+ackInterval),
+ solicitAckAt(lastSent+ackInterval),
+ ackSolicited(false)
+{
+ assert(ackInterval > 0);
+}
+
+namespace {
+bool isSessionCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
+}
+}
+
+boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
+ if (isSessionCommand(f))
+ return boost::none;
+ if (state==RESUMING)
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
+ assert(state = ATTACHED);
+ assert(lastReceived<sendAckAt);
+ ++lastReceived;
+ QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
+ if (lastReceived == sendAckAt)
+ return sendingAck();
+ else
+ return boost::none;
+}
+
+bool SessionState::sent(const AMQFrame& f) {
+ if (isSessionCommand(f))
+ return false;
+ unackedOut.push_back(f);
+ ++lastSent;
+ QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
+ return (state!=RESUMING) &&
+ (lastSent == solicitAckAt) &&
+ sendingSolicit();
+}
+
+SessionState::Replay SessionState::replay() {
+ Replay r(unackedOut.size());
+ std::copy(unackedOut.begin(), unackedOut.end(), r.begin());
+ return r;
+}
+
+void SessionState::receivedAck(SequenceNumber acked) {
+ if (state==RESUMING) state=ATTACHED;
+ assert(state==ATTACHED);
+ if (lastSent < acked)
+ throw InvalidArgumentException("Invalid sequence number in ack");
+ size_t keep = lastSent - acked;
+ if (keep < unackedOut.size())
+ unackedOut.erase(unackedOut.begin(), unackedOut.end()-keep);
+ solicitAckAt = std::max(solicitAckAt, SequenceNumber(acked+ackInterval));
+}
+
+SequenceNumber SessionState::sendingAck() {
+ sendAckAt = lastReceived+ackInterval;
+ return lastReceived;
+}
+
+bool SessionState::sendingSolicit() {
+ assert(state == ATTACHED);
+ if (ackSolicited)
+ return false;
+ solicitAckAt = lastSent + ackInterval;
+ return true;
+}
+
+SequenceNumber SessionState::resuming() {
+ state = RESUMING;
+ return sendingAck();
+}
+
+void SessionState::suspend() {
+ state = SUSPENDED;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
new file mode 100644
index 0000000000..66fc083d3f
--- /dev/null
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -0,0 +1,127 @@
+#ifndef QPID_FRAMING_SESSIONSTATE_H
+#define QPID_FRAMING_SESSIONSTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include <boost/optional.hpp>
+
+#include <deque>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Session state common to client and broker.
+ * Implements session ack/resume protcools.
+ *
+ * A SessionState is always associated with an _open_ session (attached or
+ * suspended) it is destroyed when the session is closed.
+ *
+ * A template to make it protocol independent and easy to test.
+ */
+class SessionState
+{
+ public:
+ typedef std::vector<AMQFrame> Replay;
+
+ /** States of a session. */
+ enum State {
+ SUSPENDED, ///< Suspended, detached from any channel.
+ RESUMING, ///< Resuming: waiting for initial ack from peer.
+ ATTACHED ///< Attached to channel and operating normally.
+ };
+
+ /**
+ *Create a newly opened active session.
+ *@param ackInterval send/solicit an ack whenever N unacked frames
+ * have been received/sent.
+ *@pre ackInterval > 0
+ */
+ SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true));
+
+ const framing::Uuid& getId() const { return id; }
+ State getState() const { return state; }
+
+ /** Received incoming L3 frame.
+ * @return SequenceNumber if an ack should be sent, empty otherwise.
+ * SessionState assumes that acks are sent whenever it returns
+ * a seq. number.
+ */
+ boost::optional<SequenceNumber> received(const AMQFrame&);
+
+ /** Sent outgoing L3 frame.
+ *@return true if solicit-ack should be sent. Note the SessionState
+ *assumes that a solicit-ack is sent every time it returns true.
+ */
+ bool sent(const AMQFrame&);
+
+ /** Received normal incoming ack. */
+ void receivedAck(SequenceNumber);
+
+ /** Frames to replay
+ *@pre getState()==ATTACHED
+ */
+ Replay replay();
+
+ /** Suspend the session. */
+ void suspend();
+
+ /** Start resume protocol for the session.
+ *@returns sequence number to ack immediately. */
+ SequenceNumber resuming();
+
+ /** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
+ *
+ * Note: when received() returns a sequence number this function
+ * should not be called. SessionState assumes that the ack is sent
+ * every time received() returns a sequence number.
+ */
+ SequenceNumber sendingAck();
+
+ SequenceNumber getLastSent() const { return lastSent; }
+ SequenceNumber getLastReceived() const { return lastReceived; }
+ private:
+ typedef std::deque<AMQFrame> Unacked;
+
+ bool sendingSolicit();
+
+ State state;
+ framing::Uuid id;
+ Unacked unackedOut;
+ SequenceNumber lastReceived;
+ SequenceNumber lastSent;
+ uint32_t ackInterval;
+ SequenceNumber sendAckAt;
+ SequenceNumber solicitAckAt;
+ bool ackSolicited;
+ bool suspending;
+};
+
+
+}} // namespace qpid::common
+
+
+#endif /*!QPID_FRAMING_SESSIONSTATE_H*/
diff --git a/cpp/src/qpid/framing/TemplateVisitor.h b/cpp/src/qpid/framing/TemplateVisitor.h
new file mode 100644
index 0000000000..8c719e5110
--- /dev/null
+++ b/cpp/src/qpid/framing/TemplateVisitor.h
@@ -0,0 +1,89 @@
+#ifndef QPID_FRAMING_TEMPLATEVISITOR_H
+#define QPID_FRAMING_TEMPLATEVISITOR_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/mpl/fold.hpp>
+#include <boost/utility/value_init.hpp>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Metafunction to generate a visitor class derived from Base with a
+ * visit for each type in TypeList calling functor F. TypeList may be
+ * any boost::mpl type collection e.g. mpl::list.
+ *
+ * Generated class is: TemplateVisitor<Base, F, TypeList>::type
+ *
+ * @see make_visitor
+ */
+template <class VisitTemplate, class TypeList, class F>
+class TemplateVisitor
+{
+ struct Base : public VisitorBase {
+ F action;
+ Base(F f) : action(f) {}
+ using VisitorBase::visit;
+ };
+
+ template <class B, class T> struct Visit : public B {
+ Visit(F action) : B(action) {}
+ using B::visit;
+ void visit(const T& body) { action(body); }
+ };
+
+ typedef typename boost::mpl::fold<
+ TypeList, Base, Visit<boost::mpl::placeholders::_1,
+ boost::mpl::placeholders::_2>
+ >::type type;
+};
+
+/**
+ * Construct a TemplateVisitor to perform the given action,
+ * for example:
+ * @code
+ */
+template <class VisitorBase, class TypeList, class F>
+TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) {
+ return TemplateVisitor<VisitorBase,TypeList,F>::type(action);
+};
+
+/**
+ * For method body classes in TypeList, invoke the corresponding function
+ * on Target and return true. For other body types return false.
+ */
+template <class TypeList, class Target>
+bool invoke(const AMQBody& body, Target& target) {
+ typename InvokeVisitor<TypeList, Target>::type v(target);
+ body.accept(v);
+ return v.target;
+}
+
+}} // namespace qpid::framing
+
+
+#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index e0372b2f68..1bb69fbca9 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -24,9 +24,13 @@
namespace qpid {
namespace framing {
-TransferContent::TransferContent(const std::string& _data)
+TransferContent::TransferContent(const std::string& data,
+ const std::string& routingKey,
+ const std::string& exchange)
{
- setData(_data);
+ setData(data);
+ getDeliveryProperties().setRoutingKey(routingKey);
+ getDeliveryProperties().setExchange(exchange);
}
AMQHeaderBody TransferContent::getHeader() const
@@ -73,14 +77,14 @@ void TransferContent::populate(const FrameSet& frameset)
const MessageProperties& TransferContent::getMessageProperties() const
{
const MessageProperties* props = header.get<MessageProperties>();
- if (!props) throw NoSuchPropertiesException();
+ if (!props) throw Exception("No message properties.");
return *props;
}
const DeliveryProperties& TransferContent::getDeliveryProperties() const
{
const DeliveryProperties* props = header.get<DeliveryProperties>();
- if (!props) throw NoSuchPropertiesException();
+ if (!props) throw Exception("No message properties.");
return *props;
}
diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h
index 6fd96f3587..88f45b7e0a 100644
--- a/cpp/src/qpid/framing/TransferContent.h
+++ b/cpp/src/qpid/framing/TransferContent.h
@@ -30,14 +30,15 @@
namespace qpid {
namespace framing {
-struct NoSuchPropertiesException : public Exception {};
-
class TransferContent : public MethodContent
{
AMQHeaderBody header;
std::string data;
public:
- TransferContent(const std::string& data = "");
+ TransferContent(const std::string& data = std::string(),
+ const std::string& routingKey = std::string(),
+ const std::string& exchange = std::string());
+
AMQHeaderBody getHeader() const;
void setData(const std::string&);
void appendData(const std::string&);
diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp
index 3a83430d56..2918c48ce3 100644
--- a/cpp/src/qpid/framing/Uuid.cpp
+++ b/cpp/src/qpid/framing/Uuid.cpp
@@ -17,9 +17,9 @@
*/
#include "Uuid.h"
-
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const {
void Uuid::decode(Buffer& buf) {
if (buf.available() < size())
- THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for UUID.");
+ throw SyntaxErrorException(QPID_MSG("Not enough data for UUID."));
buf.getRawData(c_array(), size());
}
@@ -52,4 +52,10 @@ istream& operator>>(istream& in, Uuid& uuid) {
return in;
}
+std::string Uuid::str() const {
+ std::ostringstream os;
+ os << *this;
+ return os.str();
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h
index 19ae79db6a..9bde67ad8e 100644
--- a/cpp/src/qpid/framing/Uuid.h
+++ b/cpp/src/qpid/framing/Uuid.h
@@ -62,6 +62,9 @@ struct Uuid : public boost::array<uint8_t, 16> {
void encode(framing::Buffer& buf) const;
void decode(framing::Buffer& buf);
+
+ /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */
+ std::string str() const;
};
/** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index eec28333bc..69b5942ba0 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -32,4 +32,3 @@
#include "ProtocolInitiation.h"
#include "BasicHeaderProperties.h"
#include "ProtocolVersion.h"
-#include "ProtocolVersionException.h"
diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h
index a788fe36e4..94442aa357 100644
--- a/cpp/src/qpid/framing/amqp_types.h
+++ b/cpp/src/qpid/framing/amqp_types.h
@@ -61,5 +61,11 @@ class Uuid;
const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1;
const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX);
+// Forward declare class types
+class FramingContent;
+class FieldTable;
+class SequenceNumberSet;
+class Uuid;
+
}} // namespace qpid::framing
#endif
diff --git a/cpp/src/qpid/framing/variant.h b/cpp/src/qpid/framing/variant.h
index 3cb8aece5d..1fe81f8f67 100644
--- a/cpp/src/qpid/framing/variant.h
+++ b/cpp/src/qpid/framing/variant.h
@@ -23,7 +23,6 @@
/**@file Tools for using boost::variant */
-#include "qpid/QpidError.h"
#include <boost/variant.hpp>
@@ -39,7 +38,7 @@ template <class R=void>
struct NoBlankVisitor : public boost::static_visitor<R> {
R foundBlank() const {
assert(0);
- THROW_QPID_ERROR(INTERNAL_ERROR, "Invalid variant value.");
+ throw Exception(QPID_MSG("Invalid variant value."));
}
R operator()(const boost::blank&) const { return foundBlank(); }
R operator()(boost::blank&) const { return foundBlank(); }
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index 6e8f3a59cc..c483ed2379 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -190,12 +190,6 @@ void Logger::add(Statement& s) {
statements.insert(&s);
}
-void Logger::remove(Statement& s) {
- ScopedLock l(lock);
- s.enabled = false;
- statements.erase(&s);
-}
-
void Logger::configure(const Options& opts, const std::string& prog)
{
clear();
diff --git a/cpp/src/qpid/log/Logger.h b/cpp/src/qpid/log/Logger.h
index a2103f5ec6..7851c65406 100644
--- a/cpp/src/qpid/log/Logger.h
+++ b/cpp/src/qpid/log/Logger.h
@@ -67,9 +67,6 @@ class Logger : private boost::noncopyable {
/** Add a statement. */
void add(Statement& s);
- /** Remove a statement */
- void remove(Statement& s);
-
/** Log a message. */
void log(const Statement&, const std::string&);
@@ -93,7 +90,7 @@ class Logger : private boost::noncopyable {
/** Add an output destination for messages */
void output(std::auto_ptr<Output> out);
- /** Reset the logger to it's original state */
+ /** Reset the logger to it's original state. */
void clear();
private:
diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp
index 9ab314b81c..de130bc455 100644
--- a/cpp/src/qpid/log/Statement.cpp
+++ b/cpp/src/qpid/log/Statement.cpp
@@ -32,10 +32,6 @@ Statement::Initializer::Initializer(Statement& s) : statement(s) {
Logger::instance().add(s);
}
-Statement::Initializer::~Initializer() {
- Logger::instance().remove(statement);
-}
-
namespace {
const char* names[LevelTraits::COUNT] = {
"trace", "debug", "info", "notice", "warning", "error", "critical"
diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h
index 4eb4d1e7d8..18162971b0 100644
--- a/cpp/src/qpid/log/Statement.h
+++ b/cpp/src/qpid/log/Statement.h
@@ -19,8 +19,9 @@
*
*/
+#include "qpid/Msg.h"
+
#include <boost/current_function.hpp>
-#include <sstream>
namespace qpid {
namespace log {
@@ -69,23 +70,14 @@ struct Statement {
struct Initializer {
Initializer(Statement& s);
- ~Initializer();
Statement& statement;
};
};
-///@internal trickery to make QPID_LOG_STRINGSTREAM work.
-inline std::ostream& noop(std::ostream& s) { return s; }
-
///@internal static initializer for a Statement.
#define QPID_LOG_STATEMENT_INIT(level) \
{ 0, __FILE__, __LINE__, BOOST_CURRENT_FUNCTION, (::qpid::log::level) }
-///@internal Stream streamable message and return a string.
-#define QPID_LOG_STRINGSTREAM(message) \
- static_cast<std::ostringstream&>( \
- std::ostringstream() << qpid::log::noop << message).str()
-
/**
* Macro for log statements. Example of use:
* @code
@@ -110,19 +102,9 @@ inline std::ostream& noop(std::ostream& s) { return s; }
static ::qpid::log::Statement stmt_= QPID_LOG_STATEMENT_INIT(level); \
static ::qpid::log::Statement::Initializer init_(stmt_); \
if (stmt_.enabled) \
- stmt_.log(QPID_LOG_STRINGSTREAM(message)); \
+ stmt_.log(::qpid::Msg() << message); \
} while(0)
-/**
- * Macro for complicated logging logic that can't fit in a simple QPID_LOG
- * statement. For example:
- * @code
- * QPID_IF_LOG(debug) {
- * message = do_complicated_stuff;
- * QPID_LOG(debug, message);
- * }
- */
-#define QPID_IF_LOG(level)
}} // namespace qpid::log
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 733a892cff..eccfb1465e 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -29,6 +29,7 @@
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -301,7 +302,7 @@ void AsynchIOHandler::idle(AsynchIO&){
}
// If frame was egregiously large complain
if (frameSize > buff->byteCount)
- THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
buff->dataCount = buffUsed;
aio->queueWrite(buff);
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index 917afc5704..cf8199954e 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
#include "qpid/sys/ScopedIncrement.h"
#include <boost/bind.hpp>
@@ -39,73 +39,73 @@ namespace sys {
*
* Also allows consuming threads to wait until an item is available.
*/
-template <class T> class ConcurrentQueue {
+template <class T> class ConcurrentQueue : public Waitable {
public:
- ConcurrentQueue() : waiters(0), shutdown(false) {}
+ struct ShutdownException {};
+
+ ConcurrentQueue() : shutdownFlag(false) {}
- /** Threads in wait() are woken with ShutdownException before
- * destroying the queue.
- */
- ~ConcurrentQueue() {
- Mutex::ScopedLock l(lock);
- shutdown = true;
- lock.notifyAll();
- while (waiters > 0)
- lock.wait();
+ /** Waiting threads are notified by ~Waitable */
+ ~ConcurrentQueue() { shutdown(); }
+
+ bool shutdown(bool wait=true) {
+ ScopedLock l(lock);
+ if (!shutdownFlag) {
+ shutdownFlag=true;
+ lock.notifyAll();
+ if (wait) lock.waitAll();
+ shutdownFlag=true;
+ return true;
+ }
+ return false;
}
-
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
queue.push_back(data);
+ lock.notify();
}
/** If the queue is non-empty, pop the front item into data and
* return true. If the queue is empty, return false
*/
- bool pop(T& data) {
+ bool tryPop(T& data) {
Mutex::ScopedLock l(lock);
- return popInternal(data);
+ if (shutdownFlag || queue.empty())
+ return false;
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
- /** Wait up to deadline for a data item to be available.
- *@return true if data was available, false if timed out.
+ /** Wait up to a timeout for a data item to be available.
+ *@return true if data was available, false if timed out or shut down.
*@throws ShutdownException if the queue is destroyed.
*/
- bool waitPop(T& data, Duration timeout) {
- Mutex::ScopedLock l(lock);
- ScopedIncrement<size_t> w(
- waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ bool waitPop(T& data, Duration timeout=TIME_INFINITE) {
+ ScopedLock l(lock);
AbsTime deadline(now(), timeout);
- while (queue.empty() && lock.wait(deadline))
- ;
- return popInternal(data);
- }
-
- private:
-
- bool popInternal(T& data) {
- if (shutdown)
- throw ShutdownException();
+ {
+ ScopedWait(*this);
+ while (!shutdownFlag && queue.empty())
+ if (!lock.wait(deadline))
+ return false;
+ }
if (queue.empty())
return false;
- else {
- data = queue.front();
- queue.pop_front();
- return true;
- }
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
+
+ bool isShutdown() { ScopedLock l(lock); return shutdownFlag; }
- void noWaiters() {
- assert(waiters == 0);
- if (shutdown)
- lock.notify(); // Notify dtor thread.
- }
-
- Monitor lock;
+ protected:
+ Waitable lock;
+ private:
std::deque<T> queue;
- size_t waiters;
- bool shutdown;
+ bool shutdownFlag;
};
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h
index ba9e89ba5f..8645ab2484 100644
--- a/cpp/src/qpid/sys/ScopedIncrement.h
+++ b/cpp/src/qpid/sys/ScopedIncrement.h
@@ -30,17 +30,17 @@ namespace sys {
* Optionally call a function if the decremented counter value is 0.
* Note the function must not throw, it is called in the destructor.
*/
-template <class T>
+template <class T, class F=boost::function<void()> >
class ScopedIncrement : boost::noncopyable
{
public:
- ScopedIncrement(T& c, boost::function0<void> f=0)
+ ScopedIncrement(T& c, F f=0)
: count(c), callback(f) { ++count; }
~ScopedIncrement() { if (--count == 0 && callback) callback(); }
private:
T& count;
- boost::function0<void> callback;
+ F callback;
};
diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h
index 085d51d7e2..7bb3b07ae0 100644
--- a/cpp/src/qpid/sys/Serializer.h
+++ b/cpp/src/qpid/sys/Serializer.h
@@ -23,7 +23,7 @@
*
*/
-
+#include "qpid/Exception.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -41,6 +41,8 @@ class SerializerBase : private boost::noncopyable, private Runnable
{
public:
typedef boost::function<void()> VoidFn0;
+ struct ShutdownException : public Exception {};
+
/** @see Serializer::Serializer */
SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h
new file mode 100644
index 0000000000..5a92756f3a
--- /dev/null
+++ b/cpp/src/qpid/sys/StateMonitor.h
@@ -0,0 +1,78 @@
+#ifndef QPID_SYS_STATEMONITOR_H
+#define QPID_SYS_STATEMONITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Waitable.h"
+
+#include <bitset>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor with an enum state value.
+ *
+ *@param Enum: enum type to use for states.
+ *@param EnumMax: Highest enum value.
+ */
+template <class Enum, size_t MaxEnum>
+class StateMonitor : public Waitable
+{
+ public:
+ struct Set : public std::bitset<MaxEnum + 1> {
+ Set() {}
+ Set(Enum s) { set(s); }
+ Set(Enum s, Enum t) { set(s).set(t); }
+ Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
+ Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
+ };
+
+
+ StateMonitor(Enum initial) { state=initial; }
+
+ /** @pre Caller holds a ScopedLock. */
+ void set(Enum s) { state=s; notifyAll(); }
+ /** @pre Caller holds a ScopedLock. */
+ StateMonitor& operator=(Enum s) { set(s); return *this; }
+
+ /** @pre Caller holds a ScopedLock. */
+ Enum get() const { return state; }
+ /** @pre Caller holds a ScopedLock. */
+ operator Enum() const { return state; }
+
+ /** @pre Caller holds a ScopedLock */
+ void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
+
+ private:
+ Enum state;
+};
+
+}}
+
+
+#endif /*!QPID_SYS_STATEMONITOR_H*/
diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h
new file mode 100644
index 0000000000..eb71a1d742
--- /dev/null
+++ b/cpp/src/qpid/sys/Waitable.h
@@ -0,0 +1,73 @@
+#ifndef QPID_SYS_WAITABLE_H
+#define QPID_SYS_WAITABLE_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Monitor.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor that keeps track of waiting threads.
+ * Threads that use a WaitLock are counted as waiters, threads that
+ * use a normal ScopedLock are not considered waiters.
+ */
+class Waitable : public Monitor {
+ public:
+ Waitable() : waiters(0) {}
+
+ /** Use this inside a scoped lock around the
+ * call to Monitor::wait to be counted as a waiter
+ */
+ struct ScopedWait {
+ Waitable& w;
+ ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; }
+ ~ScopedWait() { --w.waiters; w.notifyAll(); }
+ };
+
+ /** Block till all waiters have finished waiting.
+ * The calling thread does not count as a waiter.
+ *@pre Must be called inside a ScopedLock but NOT a ScopedWait.
+ */
+ bool waitAll(Duration timeout=TIME_INFINITE) {
+ AbsTime deadline(now(), timeout);
+ while (waiters > 0) {
+ if (!wait(deadline)) {
+ assert(timeout != TIME_INFINITE);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ friend struct ScopedWait;
+ size_t waiters;
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!QPID_SYS_WAITABLE_H*/
diff --git a/cpp/src/qpid/sys/apr/APRBase.cpp b/cpp/src/qpid/sys/apr/APRBase.cpp
index f527e0d0b2..724c489303 100644
--- a/cpp/src/qpid/sys/apr/APRBase.cpp
+++ b/cpp/src/qpid/sys/apr/APRBase.cpp
@@ -20,7 +20,6 @@
*/
#include <iostream>
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
#include "APRBase.h"
using namespace qpid::sys;
diff --git a/cpp/src/qpid/sys/apr/APRBase.h b/cpp/src/qpid/sys/apr/APRBase.h
index c6b1854fb1..7b5644a129 100644
--- a/cpp/src/qpid/sys/apr/APRBase.h
+++ b/cpp/src/qpid/sys/apr/APRBase.h
@@ -24,7 +24,6 @@
#include <string>
#include <apr_thread_mutex.h>
#include <apr_errno.h>
-#include "qpid/QpidError.h"
namespace qpid {
namespace sys {
@@ -64,11 +63,8 @@ namespace sys {
// Inlined as it is called *a lot*
void inline qpid::sys::check(apr_status_t status, const char* file, const int line){
if (status != APR_SUCCESS){
- const int size = 50;
- char tmp[size];
- std::string msg(apr_strerror(status, tmp, size));
- throw qpid::QpidError(APR_ERROR + ((int) status), msg,
- qpid::SrcLine(file, line));
+ char tmp[256];
+ throw Exception(QPID_MSG(apr_strerror(status, tmp, size)))
}
}
diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp
index 2630337408..1552aa06b5 100644
--- a/cpp/src/qpid/sys/posix/Shlib.cpp
+++ b/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -19,8 +19,7 @@
*/
#include "qpid/sys/Shlib.h"
-
-#include <qpid/QpidError.h>
+#include "qpid/Exception.h"
#include <dlfcn.h>
@@ -32,7 +31,7 @@ void Shlib::load(const char* name) {
handle = ::dlopen(name, RTLD_NOW);
const char* error = ::dlerror();
if (error) {
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
}
}
@@ -42,7 +41,7 @@ void Shlib::unload() {
::dlclose(handle);
const char* error = ::dlerror();
if (error) {
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
}
handle = 0;
}
@@ -53,7 +52,7 @@ void* Shlib::getSymbol(const char* name) {
void* sym = ::dlsym(handle, name);
const char* error = ::dlerror();
if (error)
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
return sym;
}
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index c7a83df581..f0cc8cd5a5 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -21,7 +21,6 @@
#include "qpid/sys/Socket.h"
-#include "qpid/QpidError.h"
#include "check.h"
#include "PrivatePosix.h"
@@ -60,8 +59,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const
result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
}
- if (result < 0)
- throw QPID_POSIX_ERROR(errno);
+ QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
diff --git a/cpp/src/qpid/sys/posix/check.cpp b/cpp/src/qpid/sys/posix/check.cpp
deleted file mode 100644
index 408679caa8..0000000000
--- a/cpp/src/qpid/sys/posix/check.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <cerrno>
-#include "check.h"
-
-namespace qpid {
-namespace sys {
-
-std::string
-PosixError::getMessage(int errNo)
-{
- char buf[512];
- return std::string(strerror_r(errNo, buf, sizeof(buf)));
-}
-
-PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
- : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
-{ }
-
-}}
diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h
index 7fa7b69d3b..f864bf8762 100644
--- a/cpp/src/qpid/sys/posix/check.h
+++ b/cpp/src/qpid/sys/posix/check.h
@@ -22,41 +22,13 @@
*
*/
-#include <cerrno>
-#include <string>
-#include "qpid/QpidError.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Exception with message from errno.
- */
-class PosixError : public qpid::QpidError
-{
- public:
- static std::string getMessage(int errNo);
-
- PosixError(int errNo, const qpid::SrcLine& location) throw();
-
- ~PosixError() throw() {}
-
- int getErrNo() { return errNo; }
+#include "qpid/Exception.h"
- Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); }
-
- void throwSelf() const { throw *this; }
-
- private:
- int errNo;
-};
-
-}}
+#include <cerrno>
-/** Create a PosixError for the current file/line and errno. */
-#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO)) << " " << ERRNO)
-/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */
#define QPID_POSIX_CHECK(RESULT) \
if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp
index 9a982508d1..454b9ca56d 100644
--- a/cpp/src/tests/ClientChannelTest.cpp
+++ b/cpp/src/tests/ClientChannelTest.cpp
@@ -56,7 +56,7 @@ class ChannelTestBase : public CppUnit::TestCase
}
};
- InProcessBrokerClient connection; // client::connection + local broker
+ qpid::InProcessBrokerClient connection;
const std::string qname;
const std::string data;
Queue queue;
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 2495a06fa4..db2cd62b0a 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -18,15 +18,21 @@
* under the License.
*
*/
-#include <list>
#include "qpid_test_plugin.h"
#include "InProcessBroker.h"
#include "qpid/client/Dispatcher.h"
#include "qpid/client/Session.h"
#include "qpid/framing/TransferContent.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <boost/optional.hpp>
+
+#include <list>
using namespace qpid::client;
using namespace qpid::framing;
+using namespace qpid;
+using namespace boost;
struct DummyListener : public MessageListener
{
@@ -60,58 +66,77 @@ class ClientSessionTest : public CppUnit::TestCase
CPPUNIT_TEST(testQueueQuery);
CPPUNIT_TEST(testTransfer);
CPPUNIT_TEST(testDispatcher);
+ CPPUNIT_TEST(testResumeExpiredError);
+ CPPUNIT_TEST(testUseSuspendedError);
CPPUNIT_TEST(testSuspendResume);
- CPPUNIT_TEST(testSuspendResumeErrors);
+ CPPUNIT_TEST(testDisconnectResume);
+ CPPUNIT_TEST(testAutoDelete);
CPPUNIT_TEST_SUITE_END();
- boost::shared_ptr<Connector> broker;
- Connection connection;
+ shared_ptr<broker::Broker> broker;
Session session;
+ // Defer construction & thread creation to setUp
+ boost::optional<InProcessConnection> c;
+ boost::optional<InProcessConnection> c2;
public:
- ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker)
+ void setUp() {
+ broker = broker::Broker::create();
+ c=boost::in_place<InProcessConnection>(broker);
+ c2=boost::in_place<InProcessConnection>(broker);
+ }
+
+ void tearDown() {
+ c2.reset();
+ c.reset();
+ broker.reset();
+ }
+
+ void declareSubscribe(const std::string& q="my-queue",
+ const std::string& dest="my-dest")
{
- connection.open("");
- session = connection.newSession();
+ // FIXME aconway 2007-10-18: autoDelete queues are destroyed on channel close, not session.
+ // Fix & make all test queues exclusive, autoDelete
+ session.queueDeclare_(queue=q); // FIXME aconway 2007-10-01: exclusive=true, autoDelete=true);
+ session.messageSubscribe_(queue=q, destination=dest, acquireMode=1);
+ session.messageFlow_(destination=dest, unit=0, value=0xFFFFFFFF);//messages
+ session.messageFlow_(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
}
+ bool queueExists(const std::string& q) {
+ TypedResult<QueueQueryResult> result = session.queueQuery_(q);
+ return result.get().getQueue() == q;
+ }
+
void testQueueQuery()
{
- std::string name("my-queue");
- std::string alternate("amq.fanout");
- session.queueDeclare((queue=name, alternateExchange=alternate, exclusive=true, autoDelete=true));
- TypedResult<QueueQueryResult> result = session.queueQuery(name);
+ session = c->newSession();
+ session.queueDeclare_(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
+ TypedResult<QueueQueryResult> result = session.queueQuery_(std::string("my-queue"));
CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
- CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
+ CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"),
+ result.get().getAlternateExchange());
}
void testTransfer()
{
- std::string queueName("my-queue");
- std::string dest("my-dest");
- std::string data("my message");
- session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true);
- //subcribe to the queue with confirm_mode = 1:
- session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1);
- session.messageFlow((destination=dest, unit=0, value=1));//messages
- session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes
- //publish a message:
- TransferContent _content(data);
- _content.getDeliveryProperties().setRoutingKey("my-queue");
- session.messageTransfer_(content=_content);
+ session = c->newSession();
+ declareSubscribe();
+ session.messageTransfer_(content=TransferContent("my-message", "my-queue"));
//get & test the message:
FrameSet::shared_ptr msg = session.get();
CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
- CPPUNIT_ASSERT_EQUAL(data, msg->getContent());
+ CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
//confirm receipt:
session.execution().completed(msg->getId(), true, true);
}
void testDispatcher()
{
- session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true);
+ session = c->newSession();
+ declareSubscribe();
TransferContent msg1("One");
msg1.getDeliveryProperties().setRoutingKey("my-queue");
@@ -125,9 +150,6 @@ public:
msg3.getDeliveryProperties().setRoutingKey("my-queue");
session.messageTransfer_(content=msg3);
- session.messageSubscribe_(queue="my-queue", destination="my-dest", acquireMode=1);
- session.messageFlow((destination="my-dest", unit=0, value=1));//messages
- session.messageFlow((destination="my-dest", unit=1, value=0xFFFFFFFF));//bytes
DummyListener listener(session, "my-dest", 3);
listener.listen();
CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size());
@@ -140,29 +162,66 @@ public:
}
- void testSuspendResume() {
- session = connection.newSession(60);
+ void testResumeExpiredError() {
+ session = c->newSession(0);
+ session.suspend(); // session has 0 timeout.
+ try {
+ c->resume(session);
+ CPPUNIT_FAIL("Expected InvalidArgumentException.");
+ } catch(const InvalidArgumentException&) {}
+ }
+
+ void testUseSuspendedError() {
+ session = c->newSession(60);
session.suspend();
try {
session.exchangeQuery_(name="amq.fanout");
CPPUNIT_FAIL("Expected session suspended exception");
- } catch(...) {}
- connection.resume(session);
- session.exchangeQuery_(name="amq.fanout");
- // FIXME aconway 2007-09-25: build up session state and confirm
- //it survives the resume
+ } catch(const CommandInvalidException&) {}
}
- void testSuspendResumeErrors() {
- session.suspend(); // session has 0 timeout.
- try {
- session.exchangeQuery_(name="amq.fanout");
- CPPUNIT_FAIL("Expected suspended session exception");
- } catch(...) {}
- try {
- connection.resume(session);
- CPPUNIT_FAIL("Expected no such session exception.");
- } catch(...) {}
+ void testSuspendResume() {
+ session = c->newSession(60);
+ declareSubscribe();
+ session.suspend();
+ // Make sure we are still subscribed after resume.
+ c->resume(session);
+ session.messageTransfer_(content=TransferContent("my-message", "my-queue"));
+ FrameSet::shared_ptr msg = session.get();
+ CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
+ }
+
+ void testDisconnectResume() {
+ session = c->newSession(60);
+ session.queueDeclare_(queue="before");
+ CPPUNIT_ASSERT(queueExists("before"));
+ // Simulate lost frames.
+ c->discard();
+ session.queueDeclare_(queue=string("after"));
+ c->disconnect(); // Simulate disconnect, resume on a new connection.
+ c2->resume(session);
+ CPPUNIT_ASSERT(queueExists("after"));
+ }
+
+ void testAutoDelete() {
+ // Verify that autoDelete queues survive suspend/resume.
+ session = c->newSession(60);
+ session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true);
+ CPPUNIT_ASSERT(queueExists("my-queue"));
+ session.suspend();
+ c->resume(session);
+ CPPUNIT_ASSERT(queueExists("my-queue"));
+
+ // Verify they survive disconnect/resume on new Connection
+ c->disconnect();
+ c2->resume(session);
+
+ try {
+ // FIXME aconway 2007-10-23: Negative test, need to
+ // fix auto-delete queues to clean up with session, not channel.
+ CPPUNIT_ASSERT(queueExists("my-queue"));
+ CPPUNIT_FAIL("Negative test passed unexpectedly");
+ } catch(const ChannelException&) {}
}
};
diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp
index e1adcce0f9..39155b4ff2 100644
--- a/cpp/src/tests/ConcurrentQueue.cpp
+++ b/cpp/src/tests/ConcurrentQueue.cpp
@@ -61,7 +61,7 @@ template <class T> class DualVectorDualLockQueue {
/** If the queue is non-empty, pop the front item into data and
* return true. If the queue is empty, return false
*/
- bool pop(T& data) {
+ bool tryPop(T& data) {
Mutex::ScopedLock l(popLock);
if (popIter == popVec.end()) {
popVec.clear();
@@ -109,7 +109,7 @@ void nspin(const Duration& delay) {
struct NullQueue {
NullQueue(int items=0) : npush(items), npop(items) {}
void push(int) { --npush; }
- bool pop(int& n) {
+ bool tryPop(int& n) {
if (npop == 0)
return false;
else {
@@ -144,7 +144,7 @@ struct Popper : public Runnable {
void run() {
for (int i=items; i > 0; i--) {
int n;
- if (queue.pop(n))
+ if (queue.tryPop(n))
BOOST_REQUIRE_EQUAL(i,n);
npause();
}
diff --git a/cpp/src/tests/EventChannelTest.cpp b/cpp/src/tests/EventChannelTest.cpp
index 3ba54def86..6d8d64e165 100644
--- a/cpp/src/tests/EventChannelTest.cpp
+++ b/cpp/src/tests/EventChannelTest.cpp
@@ -117,18 +117,18 @@ class EventChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT(re.hasError());
try {
re.throwIfError();
- CPPUNIT_FAIL("Expected QpidError.");
+ CPPUNIT_FAIL("Expected Exception.");
}
- catch (const qpid::QpidError&) { }
+ catch (const qpid::Exception&) { }
// Bad file descriptor. Note in this case we fail
// in postEvent and throw immediately.
try {
ReadEvent bad;
ec->postEvent(bad);
- CPPUNIT_FAIL("Expected QpidError.");
+ CPPUNIT_FAIL("Expected Exception.");
}
- catch (const qpid::QpidError&) { }
+ catch (const qpid::Exception&) { }
}
void testWrite() {
diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp
index 5ca4e6c216..9e82447ffa 100644
--- a/cpp/src/tests/FramingTest.cpp
+++ b/cpp/src/tests/FramingTest.cpp
@@ -18,8 +18,6 @@
* under the License.
*
*/
-#include "InProcessBroker.h"
-#include "qpid/QpidError.h"
#include "qpid/client/Exchange.h"
#include "qpid/client/Queue.h"
#include "qpid/client/Connection.h"
@@ -30,6 +28,7 @@
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid_test_plugin.h"
#include <boost/bind.hpp>
@@ -200,18 +199,12 @@ class FramingTest : public CppUnit::TestCase
try {
Content content(REFERENCE, "");
CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (QpidError& e) {
- CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code);
- CPPUNIT_ASSERT_EQUAL(string("Reference cannot be empty"), e.msg);
- }
+ } catch (const InvalidArgumentException& e) {}
try {
Content content(2, "Blah");
CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (QpidError& e) {
- CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code);
- CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg);
- }
+ } catch (const SyntaxErrorException& e) {}
try {
Buffer wbuff(buffer, sizeof(buffer));
@@ -221,11 +214,8 @@ class FramingTest : public CppUnit::TestCase
Buffer rbuff(buffer, sizeof(buffer));
Content content;
content.decode(rbuff);
- CPPUNIT_ASSERT(false);//fail, expected exception
- } catch (QpidError& e) {
- CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code);
- CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg);
- }
+ CPPUNIT_FAIL("Expected exception");
+ } catch (Exception& e) {}
}
diff --git a/cpp/src/tests/HeadersExchangeTest.cpp b/cpp/src/tests/HeadersExchangeTest.cpp
index c47266caa6..f07f238ee4 100644
--- a/cpp/src/tests/HeadersExchangeTest.cpp
+++ b/cpp/src/tests/HeadersExchangeTest.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
@@ -118,7 +118,7 @@ class HeadersExchangeTest : public CppUnit::TestCase
try {
//just checking this doesn't cause assertion etc
exchange.bind(queue, key, &args);
- } catch(qpid::QpidError&) {
+ } catch(qpid::Exception&) {
//expected
}
}
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 2a9f12771b..c5860568db 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -25,6 +25,9 @@
#include "qpid/client/Connector.h"
#include "qpid/client/Connection.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/ConcurrentQueue.h"
+#include "qpid/shared_ptr.h"
#include <vector>
#include <iostream>
@@ -32,112 +35,176 @@
namespace qpid {
-namespace broker {
+
/**
- * A broker that implements client::Connector allowing direct
- * in-process connection of client to broker. Used to write round-trip
- * tests without requiring an external broker process.
- *
+ * A client::Connector that connects directly to an in-process broker.
* Also allows you to "snoop" on frames exchanged between client & broker.
*
* see FramingTest::testRequestResponseRoundtrip() for example of use.
*/
-class InProcessBroker : public client::Connector {
+class InProcessConnector :
+ public client::Connector
+{
public:
+ typedef sys::Mutex Mutex;
+ typedef Mutex::ScopedLock Lock;
+ typedef framing::FrameHandler FrameHandler;
+ typedef framing::AMQFrame AMQFrame;
+
enum Sender {CLIENT,BROKER};
- /** A frame tagged with the sender */
- struct TaggedFrame {
- TaggedFrame(Sender e, framing::AMQFrame& f) : frame(f), sender(e) {}
- bool fromBroker() const { return sender == BROKER; }
- bool fromClient() const { return sender == CLIENT; }
+ /** Simulate the network thread of a peer with a queue and a thread.
+ * With setInputHandler(0) drops frames simulating network packet loss.
+ */
+ class NetworkQueue : public sys::Runnable
+ {
+ public:
+ NetworkQueue(const char* r) : inputHandler(0), receiver(r) {
+ thread=sys::Thread(this);
+ }
- template <class MethodType>
- MethodType* asMethod() {
- return dynamic_cast<MethodType*>(frame.getBody());
+ ~NetworkQueue() {
+ queue.shutdown();
+ thread.join();
}
- framing::AMQFrame frame;
- Sender sender;
+
+ void push(AMQFrame& f) { queue.push(f); }
+
+ void run() {
+ AMQFrame f;
+ while (queue.waitPop(f)) {
+ Lock l(lock);
+ if (inputHandler) {
+ QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f));
+ inputHandler->handle(f);
+ }
+ else {
+ QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f));
+ }
+ }
+ }
+
+ void setInputHandler(FrameHandler* h) {
+ Lock l(lock);
+ inputHandler = h;
+ }
+
+ private:
+ sys::Mutex lock;
+ sys::ConcurrentQueue<AMQFrame> queue;
+ sys::Thread thread;
+ FrameHandler* inputHandler;
+ const char* const receiver;
};
-
- typedef std::vector<TaggedFrame> Conversation;
-
- InProcessBroker(framing::ProtocolVersion ver=
- framing::highestProtocolVersion
- ) :
- Connector(ver),
- protocolInit(ver),
- broker(broker::Broker::create()),
- brokerOut(BROKER, conversation),
+
+ struct InProcessHandler : public sys::ConnectionOutputHandler {
+ Sender from;
+ NetworkQueue queue;
+ const char* const sender;
+
+ InProcessHandler(Sender s)
+ : from(s),
+ queue(from==CLIENT? "BROKER" : "CLIENT"),
+ sender(from==BROKER? "BROKER" : "CLIENT")
+ {}
+
+ ~InProcessHandler() { }
+
+ void send(AMQFrame& f) {
+ QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f));
+ queue.push(f);
+ }
+
+ void close() {
+ // Do not shut down the queue here, we may be in
+ // the queue's dispatch thread.
+ }
+ };
+
+ InProcessConnector(shared_ptr<broker::Broker> b,
+ framing::ProtocolVersion v=framing::ProtocolVersion()) :
+ Connector(v),
+ protocolInit(v),
+ broker(b),
+ brokerOut(BROKER),
brokerConnection(&brokerOut, *broker),
- clientOut(CLIENT, conversation, &brokerConnection)
- {}
+ clientOut(CLIENT),
+ isClosed(false)
+ {
+ clientOut.queue.setInputHandler(&brokerConnection);
+ }
- ~InProcessBroker() { broker->shutdown(); }
+ ~InProcessConnector() {
+ close();
+
+ }
void connect(const std::string& /*host*/, int /*port*/) {}
+
void init() { brokerConnection.initiated(protocolInit); }
- void close() {}
+
+ void close() {
+ if (!isClosed) {
+ isClosed = true;
+ brokerOut.close();
+ clientOut.close();
+ brokerConnection.closed();
+ }
+ }
/** Client's input handler. */
void setInputHandler(framing::InputHandler* handler) {
- brokerOut.in = handler;
+ brokerOut.queue.setInputHandler(handler);
}
/** Called by client to send a frame */
void send(framing::AMQFrame& frame) {
- clientOut.send(frame);
+ clientOut.handle(frame);
}
- /** Entire client-broker conversation is recorded here */
- Conversation conversation;
+ /** Sliently discard frames sent by either party, lost network traffic. */
+ void discard() {
+ brokerOut.queue.setInputHandler(0);
+ clientOut.queue.setInputHandler(0);
+ }
private:
- /** OutputHandler that forwards data to an InputHandler */
- struct OutputToInputHandler : public sys::ConnectionOutputHandler {
- OutputToInputHandler(
- Sender sender_, Conversation& conversation_,
- framing::InputHandler* ih=0
- ) : sender(sender_), conversation(conversation_), in(ih) {}
-
- void send(framing::AMQFrame& frame) {
- QPID_LOG(debug,
- (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame);
- conversation.push_back(TaggedFrame(sender, frame));
- in->received(frame);
- }
-
- void close() {}
-
- Sender sender;
- Conversation& conversation;
- framing::InputHandler* in;
- };
-
+ sys::Mutex lock;
framing::ProtocolInitiation protocolInit;
- shared_ptr<Broker> broker;
- OutputToInputHandler brokerOut;
+ shared_ptr<broker::Broker> broker;
+ InProcessHandler brokerOut;
broker::Connection brokerConnection;
- OutputToInputHandler clientOut;
+ InProcessHandler clientOut;
+ bool isClosed;
};
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::TaggedFrame& tf)
-{
- return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << tf.frame;
-}
-
-std::ostream& operator<<(
- std::ostream& out, const InProcessBroker::Conversation& conv)
-{
- copy(conv.begin(), conv.end(),
- std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n"));
- return out;
-}
-
-} // namespace broker
-} // namespace qpid
+struct InProcessConnection : public client::Connection {
+ InProcessConnection(shared_ptr<broker::Broker> b)
+ : client::Connection(
+ shared_ptr<client::Connector>(
+ new InProcessConnector(b)))
+ {
+ open("");
+ }
+
+ ~InProcessConnection() { }
+
+ /** Simulate disconnected network connection. */
+ void disconnect() { impl->getConnector()->close(); }
+
+ /** Sliently discard frames sent by either party, lost network traffic. */
+ void discard() {
+ dynamic_pointer_cast<InProcessConnector>(
+ impl->getConnector())->discard();
+ }
+};
+/** A connector with its own broker */
+struct InProcessBroker : public InProcessConnector {
+ InProcessBroker() : InProcessConnector(broker::Broker::create()) {}
+};
+
+} // namespace qpid
#endif // _tests_InProcessBroker_h
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 233614367d..b954ae88f4 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -20,16 +20,10 @@ CLEANFILES=
# Unit test programs.
#
-# FIXME aconway 2007-08-29: enable when session is reinstated.
-# TESTS+=Session
-# check_PROGRAMS+=Session
-# Session_SOURCES=Session.cpp
-# Session_LDADD=-lboost_unit_test_framework $(lib_broker)
-
-TESTS+=ResumeHandler
-check_PROGRAMS+=ResumeHandler
-ResumeHandler_SOURCES=ResumeHandler.cpp
-ResumeHandler_LDADD=-lboost_unit_test_framework $(lib_common)
+TESTS+=SessionState
+check_PROGRAMS+=SessionState
+SessionState_SOURCES=SessionState.cpp
+SessionState_LDADD=-lboost_unit_test_framework $(lib_common)
TESTS+=Blob
check_PROGRAMS+=Blob
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 114e0045f5..3235fe2418 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -54,7 +54,7 @@ class FailOnDeliver : public Deliverable
public:
void deliverTo(Queue::shared_ptr& queue)
{
- throw Exception(boost::format("Invalid delivery to %1%") % queue->getName());
+ throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
};
diff --git a/cpp/src/tests/ResumeHandler.cpp b/cpp/src/tests/ResumeHandler.cpp
deleted file mode 100644
index 1073e42a3c..0000000000
--- a/cpp/src/tests/ResumeHandler.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/ResumeHandler.h"
-
-#define BOOST_AUTO_TEST_MAIN
-#include <boost/test/auto_unit_test.hpp>
-
-#include <vector>
-
-using namespace std;
-using namespace qpid::framing;
-
-AMQFrame& frame(const char* s) {
- static AMQFrame frame;
- frame.setBody(AMQContentBody(s));
- return frame;
-}
-
-struct Collector : public FrameHandler, public vector<AMQFrame> {
- void handle(AMQFrame& f) { push_back(f); }
-};
-
-
-namespace qpid {
-namespace framing {
-
-bool operator==(const AMQFrame& a, const AMQFrame& b) {
- const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody());
- const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody());
- return ab && bb && ab->getData() == bb->getData();
-}
-
-}} // namespace qpid::framing
-
-
-BOOST_AUTO_TEST_CASE(testSend) {
- AMQFrame f;
- ResumeHandler sender;
- Collector collect;
- sender.out.next = &collect;
- sender.out(frame("a"));
- BOOST_CHECK_EQUAL(1u, collect.size());
- BOOST_CHECK_EQUAL(frame("a"), collect[0]);
- sender.out(frame("b"));
- sender.out(frame("c"));
- sender.ackReceived(1); // ack a,b.
- sender.out(frame("d"));
- BOOST_CHECK_EQUAL(4u, collect.size());
- BOOST_CHECK_EQUAL(frame("d"), collect.back());
- // Now try a resend.
- collect.clear();
- sender.resend();
- BOOST_REQUIRE_EQUAL(collect.size(), 2u);
- BOOST_CHECK_EQUAL(frame("c"), collect[0]);
- BOOST_CHECK_EQUAL(frame("d"), collect[1]);
-}
-
-
-BOOST_AUTO_TEST_CASE(testReceive) {
- ResumeHandler receiver;
- Collector collect;
- receiver.in.next = &collect;
- receiver.in(frame("a"));
- receiver.in(frame("b"));
- BOOST_CHECK_EQUAL(receiver.getLastReceived().getValue(), 1u);
- receiver.in(frame("c"));
- BOOST_CHECK_EQUAL(receiver.getLastReceived().getValue(), 2u);
- BOOST_CHECK_EQUAL(3u, collect.size());
- BOOST_CHECK_EQUAL(frame("a"), collect[0]);
- BOOST_CHECK_EQUAL(frame("c"), collect[2]);
-}
diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp
new file mode 100644
index 0000000000..c8d912801e
--- /dev/null
+++ b/cpp/src/tests/SessionState.cpp
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/SessionState.h"
+
+#define BOOST_AUTO_TEST_MAIN
+#include <boost/test/auto_unit_test.hpp>
+#include <boost/bind.hpp>
+
+using namespace std;
+using namespace qpid::framing;
+using namespace boost;
+
+// Create a frame with a one-char string.
+AMQFrame& frame(char s) {
+ static AMQFrame frame;
+ frame.setBody(AMQContentBody(string(&s, 1)));
+ return frame;
+}
+
+// Extract the one-char string from a frame.
+char charFromFrame(const AMQFrame& f) {
+ const AMQContentBody* b=dynamic_cast<const AMQContentBody*>(f.getBody());
+ BOOST_REQUIRE(b && b->getData().size() > 0);
+ return b->getData()[0];
+}
+
+// Sent chars as frames
+void sent(SessionState& session, const std::string& frames) {
+ for_each(frames.begin(), frames.end(),
+ bind(&SessionState::sent, ref(session), bind(frame, _1)));
+}
+
+// Received chars as frames
+void received(SessionState& session, const std::string& frames) {
+ for_each(frames.begin(), frames.end(),
+ bind(&SessionState::received, session, bind(frame, _1)));
+}
+
+// Make a string from a ReplayRange.
+std::string replayChars(const SessionState::Replay& frames) {
+ string result(frames.size(), ' ');
+ transform(frames.begin(), frames.end(), result.begin(),
+ bind(&charFromFrame, _1));
+ return result;
+}
+
+namespace qpid {
+namespace framing {
+
+bool operator==(const AMQFrame& a, const AMQFrame& b) {
+ const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody());
+ const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody());
+ return ab && bb && ab->getData() == bb->getData();
+}
+
+}} // namespace qpid::framing
+
+
+BOOST_AUTO_TEST_CASE(testSent) {
+ // Test that we send solicit-ack at the right interval.
+ AMQContentBody f;
+ SessionState s1(1);
+ BOOST_CHECK(s1.sent(f));
+ BOOST_CHECK(s1.sent(f));
+ BOOST_CHECK(s1.sent(f));
+
+ SessionState s3(3);
+ BOOST_CHECK(!s3.sent(f));
+ BOOST_CHECK(!s3.sent(f));
+ BOOST_CHECK(s3.sent(f));
+
+ BOOST_CHECK(!s3.sent(f));
+ BOOST_CHECK(!s3.sent(f));
+ s3.receivedAck(4);
+ BOOST_CHECK(!s3.sent(f));
+ BOOST_CHECK(!s3.sent(f));
+ BOOST_CHECK(s3.sent(f));
+}
+
+BOOST_AUTO_TEST_CASE(testReplay) {
+ // Replay of all frames.
+ SessionState session(100);
+ sent(session, "abc");
+ session.suspend(); session.resuming();
+ session.receivedAck(-1);
+ BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc");
+
+ // Replay with acks
+ session.receivedAck(0); // ack a.
+ session.suspend();
+ session.resuming();
+ session.receivedAck(1); // ack b.
+ BOOST_CHECK_EQUAL(replayChars(session.replay()), "c");
+
+ // Replay after further frames.
+ sent(session, "def");
+ session.suspend();
+ session.resuming();
+ session.receivedAck(3);
+ BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef");
+
+ // Bad ack, too high
+ try {
+ session.receivedAck(6);
+ BOOST_FAIL("expected exception");
+ } catch(const qpid::Exception&) {}
+
+}
+
+BOOST_AUTO_TEST_CASE(testReceived) {
+ // Check that we request acks at the right interval.
+ AMQContentBody f;
+ SessionState s1(1);
+ BOOST_CHECK_EQUAL(0u, *s1.received(f));
+ BOOST_CHECK_EQUAL(1u, *s1.received(f));
+ BOOST_CHECK_EQUAL(2u, *s1.received(f));
+
+ SessionState s3(3);
+ BOOST_CHECK(!s3.received(f));
+ BOOST_CHECK(!s3.received(f));
+ BOOST_CHECK_EQUAL(2u, *s3.received(f));
+
+ BOOST_CHECK(!s3.received(f));
+ BOOST_CHECK(!s3.received(f));
+ BOOST_CHECK_EQUAL(5u, *s3.received(f));
+}
diff --git a/cpp/src/tests/Shlib.cpp b/cpp/src/tests/Shlib.cpp
index 87136425ab..6420af915e 100644
--- a/cpp/src/tests/Shlib.cpp
+++ b/cpp/src/tests/Shlib.cpp
@@ -20,6 +20,7 @@
#include "test_tools.h"
#include "qpid/sys/Shlib.h"
+#include "qpid/Exception.h"
#define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*>
#include <boost/test/auto_unit_test.hpp>
@@ -40,7 +41,7 @@ BOOST_AUTO_TEST_CASE(testShlib) {
sh.getSymbol("callMe");
BOOST_FAIL("Expected exception");
}
- catch (...) {}
+ catch (const qpid::Exception&) {}
}
BOOST_AUTO_TEST_CASE(testAutoShlib) {
diff --git a/cpp/src/tests/TimerTest.cpp b/cpp/src/tests/TimerTest.cpp
index 682699dbd3..3f2a1c57ec 100644
--- a/cpp/src/tests/TimerTest.cpp
+++ b/cpp/src/tests/TimerTest.cpp
@@ -26,6 +26,7 @@
#include <iostream>
#include <memory>
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
using namespace qpid::broker;
using namespace qpid::sys;
diff --git a/cpp/src/tests/TxMocks.h b/cpp/src/tests/TxMocks.h
index e4e74ee535..127a27c005 100644
--- a/cpp/src/tests/TxMocks.h
+++ b/cpp/src/tests/TxMocks.h
@@ -25,7 +25,6 @@
#include "qpid/Exception.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
-#include <boost/format.hpp>
#include <iostream>
#include <vector>
@@ -40,9 +39,9 @@ template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<
i++;
}
if (i < expected.size()) {
- throw qpid::Exception(boost::format("Missing %1%") % expected[i]);
+ throw qpid::Exception(QPID_MSG("Missing " << expected[i]));
} else if (i < actual.size()) {
- throw qpid::Exception(boost::format("Extra %1%") % actual[i]);
+ throw qpid::Exception(QPID_MSG("Extra " << actual[i]));
}
CPPUNIT_ASSERT_EQUAL(expected.size(), actual.size());
}
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp
index 8cf43ce069..e4fd57824c 100644
--- a/cpp/src/tests/client_test.cpp
+++ b/cpp/src/tests/client_test.cpp
@@ -29,7 +29,6 @@
#include <iostream>
#include "TestOptions.h"
-#include "qpid/QpidError.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
diff --git a/cpp/src/tests/echo_service.cpp b/cpp/src/tests/echo_service.cpp
index 7989ec8543..c3569d5fd4 100644
--- a/cpp/src/tests/echo_service.cpp
+++ b/cpp/src/tests/echo_service.cpp
@@ -27,7 +27,6 @@
* sender-specified private queue.
*/
-#include "qpid/QpidError.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Exchange.h"
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index 3feef7e876..3783ae6901 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -22,7 +22,6 @@
#include <iostream>
#include "TestOptions.h"
-#include "qpid/QpidError.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp
index 5bfe88662a..56f9cbf3d2 100644
--- a/cpp/src/tests/interop_runner.cpp
+++ b/cpp/src/tests/interop_runner.cpp
@@ -21,7 +21,6 @@
#include "qpid/Options.h"
#include "qpid/Exception.h"
-#include "qpid/QpidError.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Exchange.h"
diff --git a/cpp/src/tests/logging.cpp b/cpp/src/tests/logging.cpp
index f5402aaad7..1042e60077 100644
--- a/cpp/src/tests/logging.cpp
+++ b/cpp/src/tests/logging.cpp
@@ -71,22 +71,18 @@ BOOST_AUTO_TEST_CASE(testSelector_enable) {
BOOST_CHECK(s.isEnabled(critical, "oops"));
}
-Logger& clearLogger() {
- Logger::instance().clear();
- return Logger::instance();
-}
-
BOOST_AUTO_TEST_CASE(testStatementEnabled) {
- // Verify that the logger enables and disables log statements.
- Logger& l=clearLogger();
+ // Verify that the singleton enables and disables static
+ // log statements.
+ Logger& l = Logger::instance();
l.select(Selector(debug));
- Statement s=QPID_LOG_STATEMENT_INIT(debug);
+ static Statement s=QPID_LOG_STATEMENT_INIT(debug);
BOOST_CHECK(!s.enabled);
- Statement::Initializer init(s);
+ static Statement::Initializer init(s);
BOOST_CHECK(s.enabled);
- Statement s2=QPID_LOG_STATEMENT_INIT(warning);
- Statement::Initializer init2(s2);
+ static Statement s2=QPID_LOG_STATEMENT_INIT(warning);
+ static Statement::Initializer init2(s2);
BOOST_CHECK(!s2.enabled);
l.select(Selector(warning));
@@ -98,9 +94,10 @@ struct TestOutput : public Logger::Output {
vector<string> msg;
vector<Statement> stmt;
- TestOutput() {
- Logger::instance().output(qpid::make_auto_ptr<Logger::Output>(this));
+ TestOutput(Logger& l) {
+ l.output(std::auto_ptr<Logger::Output>(this));
}
+
void log(const Statement& s, const string& m) {
msg.push_back(m);
stmt.push_back(s);
@@ -111,10 +108,12 @@ struct TestOutput : public Logger::Output {
using boost::assign::list_of;
BOOST_AUTO_TEST_CASE(testLoggerOutput) {
- Logger& l=clearLogger();
+ Logger l;
+ l.clear();
l.select(Selector(debug));
Statement s=QPID_LOG_STATEMENT_INIT(debug);
- TestOutput* out=new TestOutput();
+
+ TestOutput* out=new TestOutput(l);
// Verify message is output.
l.log(s, "foo");
@@ -122,7 +121,7 @@ BOOST_AUTO_TEST_CASE(testLoggerOutput) {
BOOST_CHECK_EQUAL(expect, out->msg);
// Verify multiple outputs
- TestOutput* out2=new TestOutput();
+ TestOutput* out2=new TestOutput(l);
l.log(Statement(), "baz");
expect.push_back("baz\n");
BOOST_CHECK_EQUAL(expect, out->msg);
@@ -131,9 +130,10 @@ BOOST_AUTO_TEST_CASE(testLoggerOutput) {
}
BOOST_AUTO_TEST_CASE(testMacro) {
- Logger& l = clearLogger();
+ Logger& l=Logger::instance();
+ l.clear();
l.select(Selector(info));
- TestOutput* out=new TestOutput();
+ TestOutput* out=new TestOutput(l);
QPID_LOG(info, "foo");
vector<string> expect=list_of("foo\n");
BOOST_CHECK_EQUAL(expect, out->msg);
@@ -150,9 +150,9 @@ BOOST_AUTO_TEST_CASE(testMacro) {
}
BOOST_AUTO_TEST_CASE(testLoggerFormat) {
- Logger& l=clearLogger();
+ Logger& l = Logger::instance();
l.select(Selector(critical));
- TestOutput* out=new TestOutput();
+ TestOutput* out=new TestOutput(l);
// Time format is YYY-Month-dd hh:mm:ss
l.format(Logger::TIME);
@@ -183,7 +183,8 @@ BOOST_AUTO_TEST_CASE(testLoggerFormat) {
}
BOOST_AUTO_TEST_CASE(testOstreamOutput) {
- Logger& l=clearLogger();
+ Logger& l=Logger::instance();
+ l.clear();
l.select(Selector(error));
ostringstream os;
l.output(os);
@@ -191,12 +192,12 @@ BOOST_AUTO_TEST_CASE(testOstreamOutput) {
QPID_LOG(error, "bar");
QPID_LOG(error, "baz");
BOOST_CHECK_EQUAL("foo\nbar\nbaz\n", os.str());
- l.clear();
}
#if 0 // This test requires manual intervention. Normally disabled.
BOOST_AUTO_TEST_CASE(testSyslogOutput) {
- Logger& l = clearLogger();
+ Logger& l=Logger::instance();
+ l.clear();
l.select(Selector(info));
l.syslog("qpid_test");
QPID_LOG(info, "Testing QPID");
@@ -312,7 +313,7 @@ BOOST_AUTO_TEST_CASE(testSelectorFromOptions) {
}
BOOST_AUTO_TEST_CASE(testOptionsFormat) {
- Logger& l = clearLogger();
+ Logger l;
{
Options opts;
BOOST_CHECK_EQUAL(Logger::TIME|Logger::LEVEL, l.format(opts));
@@ -344,7 +345,8 @@ BOOST_AUTO_TEST_CASE(testOptionsFormat) {
}
BOOST_AUTO_TEST_CASE(testLoggerConfigure) {
- Logger& l = clearLogger();
+ Logger& l=Logger::instance();
+ l.clear();
Options opts;
char* argv[]={
0,
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index d16ebd43de..bc816f6597 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -27,7 +27,6 @@
#include "qpid/client/Connection.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Message.h"
-#include "qpid/QpidError.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
diff --git a/cpp/src/tests/run-unit-tests b/cpp/src/tests/run-unit-tests
index ce8f488b29..464ce131f5 100755
--- a/cpp/src/tests/run-unit-tests
+++ b/cpp/src/tests/run-unit-tests
@@ -24,5 +24,5 @@ test -z "$TEST_ARGS" && TEST_ARGS=".libs/*Test.so"
test -z "$srcdir" && srcdir=.
# libdlclose_noop prevents unloading symbols needed for valgrind output.
-LD_PRELOAD=.libs/libdlclose_noop.so exec $srcdir/test_env DllPlugInTester -c -b $TEST_ARGS
-
+export LD_PRELOAD=.libs/libdlclose_noop.so
+source $srcdir/run_test DllPlugInTester -c -b $TEST_ARGS
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 9369b591a6..5aef16354e 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -32,7 +32,6 @@
* listening).
*/
-#include "qpid/QpidError.h"
#include "TestOptions.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index 74fcf8b057..1c5b51309b 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -35,7 +35,6 @@
*/
#include "TestOptions.h"
-#include "qpid/QpidError.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Exchange.h"