diff options
Diffstat (limited to 'cpp')
111 files changed, 1844 insertions, 1494 deletions
diff --git a/cpp/rubygen/templates/constants.rb b/cpp/rubygen/templates/constants.rb index 2ef6502772..5fbbefe218 100755 --- a/cpp/rubygen/templates/constants.rb +++ b/cpp/rubygen/templates/constants.rb @@ -10,31 +10,71 @@ class ConstantsGen < CppGen @dir="qpid/framing" end - def generate() + def constants_h() h_file("#{@dir}/constants") { namespace(@namespace) { - scope("enum AmqpConstant {","};") { - genl @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }.join(",\n") - } - } - } - + scope("enum AmqpConstant {","};") { + l=[] + l.concat @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" } + @amqp.classes.each { |c| + l << "#{c.name.shout}_CLASS_ID=#{c.index}" + l.concat c.methods_.map { |m| + "#{c.name.shout}_#{m.name.shout}_METHOD_ID=#{m.index}" } + } + genl l.join(",\n") + }}} + end + + def exbase(c) + case c.class_ + when "soft-error" then "ChannelException" + when "hard-error" then "ConnectionException" + end + end + + def reply_exceptions_h() h_file("#{@dir}/reply_exceptions") { include "qpid/Exception" namespace(@namespace) { @amqp.constants.each { |c| - if c.class_ - exname=c.name.caps+"Exception" - base = c.class_=="soft-error" ? "ChannelException" : "ConnectionException" - text=(c.doc or c.name).tr_s!(" \t\n"," ") - struct(exname, base) { - genl "#{exname}(const std::string& msg=\"#{text})\") : #{base}(#{c.value}, msg) {}" + base = exbase c + if base + genl + struct(c.name.caps+"Exception", base) { + genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"#{c.name}: \"+msg) {}" } end } + genl + genl "void throwReplyException(int code, const std::string& text);" } } - + end + + def reply_exceptions_cpp() + cpp_file("#{@dir}/reply_exceptions") { + include "#{@dir}/reply_exceptions" + include "<sstream>" + namespace("qpid::framing") { + scope("void throwReplyException(int code, const std::string& text) {"){ + scope("switch (code) {") { + genl "case 200: break; // No exception" + @amqp.constants.each { |c| + if exbase c + genl "case #{c.value}: throw #{c.name.caps}Exception(text);" + end + } + scope("default:","") { + genl "std::ostringstream msg;" + genl 'msg << "Invalid reply code " << code << ": " << text;' + genl 'throw InvalidArgumentException(msg.str());' + }}}}} + end + + def generate() + constants_h + reply_exceptions_h + reply_exceptions_cpp end end diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 57b3d27b71..ebb8916d6a 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -52,7 +52,6 @@ qpidd_SOURCES = qpidd.cpp posix_plat_src = \ qpid/sys/epoll/EpollPoller.cpp \ - qpid/sys/posix/check.cpp \ qpid/sys/posix/Socket.cpp \ qpid/sys/posix/AsynchIO.cpp \ qpid/sys/posix/Time.cpp \ @@ -110,8 +109,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/InitiationHandler.cpp \ qpid/framing/ProtocolInitiation.cpp \ qpid/framing/ProtocolVersion.cpp \ - qpid/framing/ProtocolVersionException.cpp \ - qpid/framing/ResumeHandler.cpp qpid/framing/ResumeHandler.h \ + qpid/framing/SessionState.cpp qpid/framing/SessionState.h \ qpid/framing/SendContent.cpp \ qpid/framing/SequenceNumber.cpp \ qpid/framing/SequenceNumberSet.cpp \ @@ -126,7 +124,6 @@ libqpidcommon_la_SOURCES = \ qpid/Exception.cpp \ qpid/Plugin.cpp \ qpid/Url.cpp \ - qpid/QpidError.cpp \ qpid/sys/AsynchIOAcceptor.cpp \ qpid/sys/Dispatcher.cpp \ qpid/sys/Runnable.cpp \ @@ -210,7 +207,6 @@ libqpidclient_la_SOURCES = \ qpid/client/MessageListener.cpp \ qpid/client/Correlator.cpp \ qpid/client/CompletionTracker.cpp \ - qpid/client/SessionHandler.cpp \ qpid/client/ConnectionHandler.cpp \ qpid/client/ExecutionHandler.cpp \ qpid/client/FutureCompletion.cpp \ @@ -221,12 +217,12 @@ libqpidclient_la_SOURCES = \ nobase_include_HEADERS = \ $(platform_hdr) \ + qpid/assert.h \ qpid/Exception.h \ qpid/ExceptionHolder.h \ qpid/Msg.h \ qpid/Options.h \ qpid/Plugin.h \ - qpid/QpidError.h \ qpid/SharedObject.h \ qpid/Url.h \ qpid/memory.h \ @@ -323,7 +319,6 @@ nobase_include_HEADERS = \ qpid/client/MessageQueue.h \ qpid/client/Response.h \ qpid/client/SessionCore.h \ - qpid/client/SessionHandler.h \ qpid/client/StateManager.h \ qpid/client/TypedResult.h \ qpid/framing/AMQBody.h \ @@ -339,6 +334,7 @@ nobase_include_HEADERS = \ qpid/framing/Blob.h \ qpid/framing/BodyHandler.h \ qpid/framing/Buffer.h \ + qpid/framing/ChannelHandler.h \ qpid/framing/ChannelAdapter.h \ qpid/framing/FieldTable.h \ qpid/framing/FieldValue.h \ @@ -360,7 +356,6 @@ nobase_include_HEADERS = \ qpid/framing/OutputHandler.h \ qpid/framing/ProtocolInitiation.h \ qpid/framing/ProtocolVersion.h \ - qpid/framing/ProtocolVersionException.h \ qpid/framing/Proxy.h \ qpid/framing/SendContent.h \ qpid/framing/SequenceNumber.h \ @@ -399,6 +394,8 @@ nobase_include_HEADERS = \ qpid/sys/Shlib.h \ qpid/sys/ShutdownHandler.h \ qpid/sys/Socket.h \ + qpid/sys/StateMonitor.h \ + qpid/sys/Waitable.h \ qpid/sys/Thread.h \ qpid/sys/Time.h \ qpid/sys/TimeoutHandler.h diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index 11051d1a2e..e0747df49e 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -23,6 +23,7 @@ #include "Exception.h" #include <typeinfo> #include <errno.h> +#include <assert.h> namespace qpid { @@ -31,45 +32,22 @@ std::string strError(int err) { return std::string(strerror_r(err, buf, sizeof(buf))); } -static void ctorLog(const std::exception* e) { - QPID_LOG(trace, "Exception: " << e->what()); +Exception::Exception(const std::string& s) throw() : msg(s) { + QPID_LOG(warning, "Exception: " << msg); } - -Exception::Exception() throw() { ctorLog(this); } - -Exception::Exception(const std::string& str) throw() - : whatStr(str) { ctorLog(this); } - -Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); } - -Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {} Exception::~Exception() throw() {} -const char* Exception::what() const throw() { return whatStr.c_str(); } - -std::string Exception::toString() const throw() { return whatStr; } - -Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); } - -void Exception::throwSelf() const { throw *this; } - -ShutdownException::ShutdownException() : Exception("Shut down.") {} - -EmptyException::EmptyException() : Exception("Empty.") {} - -const char* Exception::defaultMessage = "Unexpected exception"; - -void Exception::log(const char* what, const char* message) { - QPID_LOG(error, message << ": " << what); +std::string Exception::str() const throw() { + if (msg.empty()) + const_cast<std::string&>(msg).assign(typeid(*this).name()); + return msg; } -void Exception::log(const std::exception& e, const char* message) { - log(e.what(), message); -} +const char* Exception::what() const throw() { return str().c_str(); } -void Exception::logUnknown(const char* message) { - log("unknown exception.", message); +std::auto_ptr<Exception> Exception::clone() const throw() { + return std::auto_ptr<Exception>(new Exception(*this)); } } // namespace qpid diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index a7ab1fa8aa..bf6c1fb872 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -24,135 +24,52 @@ #include "qpid/framing/amqp_types.h" #include "qpid/Msg.h" -#include <exception> -#include <string> + #include <memory> -#include <boost/shared_ptr.hpp> -#include <boost/lexical_cast.hpp> -#include <boost/function.hpp> +#include <string> namespace qpid { -/** Get the error message for error number err. */ +/** Get the error message for a system number err, e.g. errno. */ std::string strError(int err); /** - * Exception base class for all Qpid exceptions. + * Base class for Qpid runtime exceptions. */ class Exception : public std::exception { - protected: - std::string whatStr; - public: - typedef boost::shared_ptr<Exception> shared_ptr; - typedef boost::shared_ptr<const Exception> shared_ptr_const; - typedef std::auto_ptr<Exception> auto_ptr; - - Exception() throw(); - Exception(const std::string& str) throw(); - Exception(const char* str) throw(); - Exception(const std::exception&) throw(); - - /** Allow any type that has ostream operator<< to act as message */ - template <class T> - Exception(const T& message) - : whatStr(boost::lexical_cast<std::string>(message)) {} - + explicit Exception(const std::string& str=std::string()) throw(); virtual ~Exception() throw(); - - virtual const char* what() const throw(); - virtual std::string toString() const throw(); - - virtual auto_ptr clone() const throw(); - virtual void throwSelf() const; - - /** Default message: "Unknown exception" or something like it. */ - static const char* defaultMessage; - - /** - * Log a message of the form "message: what" - *@param what Exception's what() message. - *@param message Prefix message. - */ - static void log(const char* what, const char* message = defaultMessage); - - /** - * Log an exception. - *@param e Exception to log. - - */ - static void log( - const std::exception& e, const char* message = defaultMessage); - - /** - * Log an unknown exception - use in catch(...) - *@param message Prefix message. - */ - static void logUnknown(const char* message = defaultMessage); - - /** - * Wrapper template function to call another function inside - * try/catch and log any exception. Use boost::bind to wrap - * member function calls or functions with arguments. - * - *@param f Function to call in try block. - *@param retrhow If true the exception is rethrown. - *@param message Prefix message. - */ - template <class T> - static T tryCatchLog(boost::function0<T> f, bool rethrow=true, - const char* message=defaultMessage) - { - try { - return f(); - } - catch (const std::exception& e) { - log(e, message); - if (rethrow) - throw; - } - catch (...) { - logUnknown(message); - if (rethrow) - throw; - } - } + virtual const char *what() const throw(); + virtual std::auto_ptr<Exception> clone() const throw(); + virtual std::string str() const throw(); + private: + std::string msg; }; struct ChannelException : public Exception { - framing::ReplyCode code; - template <class T> - ChannelException(framing::ReplyCode code_, const T& message) + const framing::ReplyCode code; + ChannelException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} - void throwSelf() const { throw *this; } }; struct ConnectionException : public Exception { - framing::ReplyCode code; - template <class T> - ConnectionException(framing::ReplyCode code_, const T& message) + const framing::ReplyCode code; + ConnectionException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} - void throwSelf() const { throw *this; } }; -/** - * Exception used to indicate that a thread should shut down. - * Does not indicate an error that should be signalled to the user. +/** Clone an exception. + * For qpid::Exception this calls the clone member function. + * For standard exceptions, uses the copy constructor. + * For unknown exception types creates a std::exception + * with the same what() string. */ -struct ShutdownException : public Exception { - ShutdownException(); - void throwSelf() const { throw *this; } -}; - -/** Exception to indicate empty queue or other empty state */ -struct EmptyException : public Exception { - EmptyException(); - void throwSelf() const { throw *this; } -}; +std::auto_ptr<std::exception> clone(const std::exception&); -} +} // namespace qpid #endif /*!_Exception_*/ diff --git a/cpp/src/qpid/Msg.h b/cpp/src/qpid/Msg.h index c1a6b54d05..7214db611f 100644 --- a/cpp/src/qpid/Msg.h +++ b/cpp/src/qpid/Msg.h @@ -54,7 +54,7 @@ inline std::ostream& operator<<(std::ostream& o, const Msg& m) { } /** Construct a message using operator << and append (file:line) */ -#define QPID_MSG(message) Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")" +#define QPID_MSG(message) ::qpid::Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")" } // namespace qpid diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp deleted file mode 100644 index 740ec24e54..0000000000 --- a/cpp/src/qpid/QpidError.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/format.hpp> - -#include "QpidError.h" -#include <sstream> - -using namespace qpid; - -QpidError::QpidError() : code(0) {} - -QpidError::~QpidError() throw() {} - -Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); } - -void QpidError::throwSelf() const { throw *this; } - -std::string QpidError::message(int code, const std::string& msg, const char* file, int line) { - return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str(); -} - - diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h deleted file mode 100644 index 2ff6571365..0000000000 --- a/cpp/src/qpid/QpidError.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef __QpidError__ -#define __QpidError__ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <memory> -#include <ostream> - -#include "Exception.h" - -namespace qpid { - -struct SrcLine { - public: - SrcLine(const std::string& file_="", int line_=0) : - file(file_), line(line_) {} - - std::string file; - int line; -}; - -class QpidError : public Exception -{ - public: - const int code; - SrcLine loc; - std::string msg; - - QpidError(); - - template <class T> - QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)), - code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {} - - ~QpidError() throw(); - Exception::auto_ptr clone() const throw(); - void throwSelf() const; - - /** Format message for exception. */ - static std::string message(int code, const std::string& msg, const char* file, int line); -}; - - -} // namespace qpid - -#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) - -#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) - -#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) - -#define THROW_QPID_ERRNO_IF(cond) if (cond) QPID_ERROR(INTERNAL, strError(errno)); - -const int PROTOCOL_ERROR = 10000; -const int APR_ERROR = 20000; -const int FRAMING_ERROR = 30000; -const int CLIENT_ERROR = 40000; -const int INTERNAL_ERROR = 50000; - -#endif diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e53774740a..b88f1c6c6a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -64,7 +64,8 @@ Broker::Options::Options(const std::string& name) : storeDir("/var"), storeAsync(false), enableMgmt(0), - mgmtPubInterval(10) + mgmtPubInterval(10), + ack(100) { addOptions() ("port,p", optValue(port,"PORT"), @@ -102,7 +103,8 @@ Broker::Broker(const Broker::Options& conf) : queues(store.get()), stagingThreshold(0), factory(*this), - dtxManager(store.get()) + dtxManager(store.get()), + sessionManager(conf.ack) { if(conf.enableMgmt){ managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 2018371624..817197a351 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -69,6 +69,7 @@ class Broker : public sys::Runnable, public Plugin::Target bool storeAsync; bool enableMgmt; uint16_t mgmtPubInterval; + uint32_t ack; }; virtual ~Broker(); diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 99b585406e..dad40868d6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -21,6 +21,7 @@ #include "MessageDelivery.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace broker { @@ -75,8 +76,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri checkAlternate(response.first, alternate); } }catch(UnknownExchangeTypeException& e){ - throw ConnectionException( - 503, "Exchange type not implemented: " + type); + throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); } } } @@ -84,24 +84,23 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) { if (!type.empty() && exchange->getType() != type) { - throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type); + throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type)); } } void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) { - if (alternate && alternate != exchange->getAlternate()) { - throw ConnectionException(530, "Exchange declared with alternate-exchange " - + exchange->getAlternate()->getName() + ", requested " - + alternate->getName()); - } - + if (alternate && alternate != exchange->getAlternate()) + throw NotAllowedException( + QPID_MSG("Exchange declared with alternate-exchange " + << exchange->getAlternate()->getName() << ", requested " + << alternate->getName())); } void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); + if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); @@ -292,7 +291,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, Queue::shared_ptr queue = state.getQueue(queueName); if(!consumerTag.empty() && state.exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); } string newTag = consumerTag; //need to generate name here, so we have it for the adapter (it is diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 5537dc67f5..706b42c080 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -82,7 +82,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations throw framing::NotImplementedException("Tunnel class not implemented"); } // Handlers no longer implemented in BrokerAdapter: -#define BADHANDLER() assert(0); throw framing::InternalErrorException() +#define BADHANDLER() assert(0); throw framing::NotImplementedException("") ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ca0ca20849..f981d47ef7 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -28,9 +28,10 @@ #include "BrokerAdapter.h" #include "SemanticHandler.h" -#include <boost/utility/in_place_factory.hpp> #include <boost/bind.hpp> +#include <algorithm> + using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -61,6 +62,7 @@ void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { adapter.close(code, text, classId, methodId); + channels.clear(); getOutput().close(); } @@ -73,8 +75,11 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ +void Connection::closed(){ // Physically closed, suspend open sessions. try { + std::for_each( + channels.begin(), channels.end(), + boost::bind(&SessionHandler::localSuspend, _1)); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index f697986194..dd645b595e 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -46,9 +46,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); try{ if (!invoke(*handler.get(), *method)) - throw ConnectionException(503, "Class can't be accessed over channel 0"); + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); }catch(ConnectionException& e){ - handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp index 0bb3449289..3fcc487324 100644 --- a/cpp/src/qpid/broker/Daemon.cpp +++ b/cpp/src/qpid/broker/Daemon.cpp @@ -17,7 +17,7 @@ */ #include "Daemon.h" #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include <boost/iostreams/stream.hpp> #include <boost/iostreams/device/file_descriptor.hpp> diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 5887d13f85..ec042ff56a 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -44,7 +44,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, if (fail) { state.endDtx(xid, true); if (suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); + throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { return DtxDemarcationEndResult(XA_RBROLLBACK); } @@ -67,7 +67,7 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, bool resume) { if (join && resume) { - throw ConnectionException(503, "Join and resume cannot both be set."); + throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); } try { if (resume) { @@ -161,7 +161,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, const string& xid) { //Currently no heuristic completion is supported, so this should never be used. - throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); + throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 0d211017de..0597b41f98 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -20,16 +20,20 @@ */ #include "DtxManager.h" #include "DtxTimeout.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include <boost/format.hpp> #include <iostream> using qpid::sys::Mutex; using namespace qpid::broker; +using namespace qpid::framing; DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {} -DtxManager::~DtxManager() {} +DtxManager::~DtxManager() { + // timer.stop(); // FIXME aconway 2007-10-23: leaking threads. +} void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops) { @@ -84,7 +88,7 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); } return i; } @@ -94,7 +98,7 @@ void DtxManager::remove(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); } else { work.erase(i); } @@ -105,7 +109,7 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { - throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid); + throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); } else { return work.insert(xid, new DtxWorkRecord(xid, store)).first; } diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h index 33da62e7f4..7d0b8622d0 100644 --- a/cpp/src/qpid/broker/DtxTimeout.h +++ b/cpp/src/qpid/broker/DtxTimeout.h @@ -29,12 +29,7 @@ namespace broker { class DtxManager; - -struct DtxTimeoutException : public Exception -{ - DtxTimeoutException() {} -}; - +struct DtxTimeoutException : public Exception {}; struct DtxTimeout : public TimerTask { diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index f2f118c5e4..fe9e42ca32 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -19,12 +19,14 @@ * */ #include "DtxWorkRecord.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> #include <boost/mem_fn.hpp> using boost::mem_fn; using qpid::sys::Mutex; using namespace qpid::broker; +using namespace qpid::framing; DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} @@ -71,8 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase) if (prepared) { //already prepared i.e. 2pc if (onePhase) { - throw ConnectionException(503, - boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!")); } store->commit(*txn); @@ -83,8 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase) } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw ConnectionException(503, - boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); } std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -119,7 +119,7 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) throw DtxTimeoutException(); } if (completed) { - throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!")); } work.push_back(ops); } @@ -133,7 +133,7 @@ bool DtxWorkRecord::check() //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { - throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!")); } else if ((*i)->isRollbackOnly()) { rolledback = true; } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index ae1afe5abb..98e3cc7347 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,6 +24,7 @@ #include "HeadersExchange.h" #include "TopicExchange.h" #include "ManagementExchange.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; using namespace qpid::sys; @@ -75,9 +76,8 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) { - throw ChannelException(404, "Exchange not found: " + name); - } + if (i == exchanges.end()) + throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name)); return i->second; } diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 215a002517..dd688cdfcf 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -20,7 +20,7 @@ */ #include "HeadersExchange.h" #include "qpid/framing/FieldValue.h" -#include "qpid/QpidError.h" +#include "qpid/framing/reply_exceptions.h" #include <algorithm> @@ -46,9 +46,8 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); FieldTable::ValuePtr what = args->get(x_match); - if (!what || (*what != all && *what != any)) { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); - } + if (!what || (*what != all && *what != any)) + throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); Binding binding(*args, queue); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), binding); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index b12910893a..834ce0a203 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -16,7 +16,7 @@ * */ -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "MessageHandlerImpl.h" #include "qpid/framing/FramingContent.h" @@ -56,39 +56,39 @@ MessageHandlerImpl::cancel(const string& destination ) void MessageHandlerImpl::open(const string& /*reference*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::close(const string& /*reference*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::resume(const string& /*reference*/, const string& /*identifier*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::offset(uint64_t /*value*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void @@ -97,19 +97,19 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& /*destination*/, bool /*noAck*/ ) { - throw ConnectionException(540, "get no longer supported"); + throw NotImplementedException("get no longer supported"); } void MessageHandlerImpl::empty() { - throw ConnectionException(540, "empty no longer supported"); + throw NotImplementedException("empty no longer supported"); } void MessageHandlerImpl::ok() { - throw ConnectionException(540, "Message.Ok no longer supported"); + throw NotImplementedException("Message.Ok no longer supported"); } void @@ -134,7 +134,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, { Queue::shared_ptr queue = state.getQueue(queueName); if(!destination.empty() && state.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); string tag = destination; state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), @@ -165,7 +165,7 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i state.addByteCredit(destination, value); } else { //unknown - throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); + throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit)); } } @@ -179,7 +179,7 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) //window state.setWindowMode(destination); } else{ - throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); + throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode)); } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 116e8d9431..18c1ab1056 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,9 +19,8 @@ * */ -#include <boost/format.hpp> - #include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include "Broker.h" #include "Queue.h" #include "Exchange.h" @@ -37,7 +36,6 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; -using boost::format; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -269,17 +267,15 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { void Queue::consume(Consumer::ptr c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); if(exclusive) { - throw ChannelException( - 403, format("Queue '%s' has an exclusive consumer." - " No more consumers allowed.") % getName()); + throw AccessRefusedException( + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } if(requestExclusive) { if(acquirers.empty() && browsers.empty()) { exclusive = c; } else { - throw ChannelException( - 403, format("Queue '%s' already has consumers." - "Exclusive access denied.") % getName()); + throw AccessRefusedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } } if (c->preAcquires()) { diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 8535dc6a60..e1a8ae470d 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -125,7 +125,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) incoming.complete(id); if (!invoker.wasHandled()) { - throw ConnectionException(540, "Not implemented"); + throw NotImplementedException("Not implemented"); } else if (invoker.hasResult()) { session.getProxy().getExecution().result(id.getValue(), invoker.getResult()); } @@ -139,7 +139,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) void SemanticHandler::handleL3(framing::AMQMethodBody* method) { if (!invoke(*this, *method)) - throw ConnectionException(540, "Not implemented"); + throw NotImplementedException("Not implemented"); } void SemanticHandler::handleContent(AMQFrame& frame) diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1f7436da94..e0e4315d03 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -31,7 +31,6 @@ #include "SessionHandler.h" #include "TxAck.h" #include "TxPublish.h" -#include "qpid/QpidError.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" @@ -116,7 +115,8 @@ void SemanticState::startTx() void SemanticState::commit(MessageStore* const store) { - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + if (!txBuffer) throw + CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); txBuffer->enlist(txAck); @@ -127,7 +127,8 @@ void SemanticState::commit(MessageStore* const store) void SemanticState::rollback() { - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + if (!txBuffer) + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); txBuffer->rollback(); accumulatedAck.clear(); @@ -141,7 +142,7 @@ void SemanticState::selectDtx() void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) { if (!dtxSelected) { - throw ConnectionException(503, "Session has not been selected for use with dtx"); + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); @@ -155,11 +156,12 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) void SemanticState::endDtx(const std::string& xid, bool fail) { if (!dtxBuffer) { - throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); + throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session")); } if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end")); + } txBuffer.reset();//ops on this session no longer transactional @@ -176,8 +178,8 @@ void SemanticState::endDtx(const std::string& xid, bool fail) void SemanticState::suspendDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend")); } txBuffer.reset();//ops on this session no longer transactional @@ -188,11 +190,12 @@ void SemanticState::suspendDtx(const std::string& xid) void SemanticState::resumeDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume")); + } if (!dtxBuffer->isSuspended()) { - throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended")); } checkDtxTimeout(); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index ed092d6a05..9b065be8af 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -26,6 +26,8 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> + namespace qpid { namespace broker { using namespace framing; @@ -33,7 +35,9 @@ using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), - connection(c), channel(ch), proxy(out), + connection(c), channel(ch, &c.getOutput()), + proxy(out), // Via my own handleOut() for L2 data. + peerSession(channel), // Direct to channel for L2 commands. ignoring(false) {} SessionHandler::~SessionHandler() {} @@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) { try { if (m && invoke(*this, *m)) return; - else if (session.get()) - session->in(f); + else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->in.handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (!ignoring) throw ChannelErrorException( - QPID_MSG("Channel " << channel << " is not open")); + QPID_MSG("Channel " << channel.get() << " is not open")); } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. session.reset(); - getProxy().getSession().closed(e.code, e.toString()); + peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) { } void SessionHandler::handleOut(AMQFrame& f) { - f.setChannel(getChannel()); - out.next->handle(f); + channel.handle(f); // Send it. + if (session->sent(f)) + peerSession.solicitAck(); } -void SessionHandler::assertOpen(const char* method) { - if (!session.get()) +void SessionHandler::assertAttached(const char* method) const { + if (!session.get()) throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); } -void SessionHandler::assertClosed(const char* method) { +void SessionHandler::assertClosed(const char* method) const { if (session.get()) throw ChannelBusyException( - QPID_MSG(method << " failed: channel " << channel + QPID_MSG(method << " failed: channel " << channel.get() << " is already open.")); } @@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) { std::auto_ptr<SessionState> state( connection.broker.getSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + peerSession.attached(session->getId(), session->getTimeout()); } void SessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(*this, id); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + session = connection.broker.getSessionManager().resume(id); + session->attach(*this); + SequenceNumber seq = session->resuming(); + peerSession.attached(session->getId(), session->getTimeout()); + proxy.getSession().ack(seq, SequenceNumberSet()); } void SessionHandler::flow(bool /*active*/) { + assertAttached("flow"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flow"); } void SessionHandler::flowOk(bool /*active*/) { + assertAttached("flowOk"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flowOk"); } void SessionHandler::close() { + assertAttached("close"); QPID_LOG(info, "Received session.close"); ignoring=false; session.reset(); - getProxy().getSession().closed(REPLY_SUCCESS, "ok"); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + peerSession.closed(REPLY_SUCCESS, "ok"); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } void SessionHandler::closed(uint16_t replyCode, const string& replyText) { @@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) { session.reset(); } +void SessionHandler::localSuspend() { + if (session.get() && session->getState() == SessionState::ATTACHED) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + } +} + void SessionHandler::suspend() { - assertOpen("suspend"); - connection.broker.getSessionManager().suspend(session); - assert(!session.get()); - getProxy().getSession().detached(); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + assertAttached("suspend"); + localSuspend(); + peerSession.detached(); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } -void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/, - const SequenceNumberSet& /*seenFrameSet*/) { - assert(0); throw NotImplementedException(); +void SessionHandler::ack(uint32_t cumulativeSeenMark, + const SequenceNumberSet& /*seenFrameSet*/) +{ + assertAttached("ack"); + if (session->getState() == SessionState::RESUMING) { + session->receivedAck(cumulativeSeenMark); + framing::SessionState::Replay replay=session->replay(); + std::for_each(replay.begin(), replay.end(), + boost::bind(&SessionHandler::handleOut, this, _1)); + } + else + session->receivedAck(cumulativeSeenMark); } void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - assert(0); throw NotImplementedException(); + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.high-water-mark"); } void SessionHandler::solicitAck() { - assert(0); throw NotImplementedException(); + assertAttached("solicit-ack"); + peerSession.ack(session->sendingAck(), SequenceNumberSet()); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 51a65e3092..9a68ddb46f 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -26,6 +26,7 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/ChannelHandler.h" #include <boost/noncopyable.hpp> @@ -52,7 +53,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - framing::ChannelId getChannel() const { return channel; } + framing::ChannelId getChannel() const { return channel.get(); } Connection& getConnection() { return connection; } const Connection& getConnection() const { return connection; } @@ -60,6 +61,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + // Called by closing connection. + void localSuspend(); + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -79,12 +83,14 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, void solicitAck(); - void assertOpen(const char* method); - void assertClosed(const char* method); + void assertAttached(const char* method) const; + void assertActive(const char* method) const; + void assertClosed(const char* method) const; Connection& connection; - const framing::ChannelId channel; + framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Session peerSession; bool ignoring; std::auto_ptr<SessionState> session; }; diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 303687c788..f12ebc6db1 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -39,7 +39,7 @@ namespace broker { using namespace sys; using namespace framing; -SessionManager::SessionManager() {} +SessionManager::SessionManager(uint32_t a) : ack(a) {} SessionManager::~SessionManager() {} @@ -47,7 +47,8 @@ std::auto_ptr<SessionState> SessionManager::open( SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); - std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_)); + std::auto_ptr<SessionState> session( + new SessionState(*this, h, timeout_, ack)); active.insert(session->getId()); return session; } @@ -55,14 +56,13 @@ std::auto_ptr<SessionState> SessionManager::open( void SessionManager::suspend(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); active.erase(session->getId()); + session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); - session->handler = 0; suspended.push_back(session.release()); // In expiry order eraseExpired(); } -std::auto_ptr<SessionState> SessionManager::resume( - SessionHandler& sh, const Uuid& id) +std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) { Mutex::ScopedLock l(lock); eraseExpired(); @@ -78,7 +78,6 @@ std::auto_ptr<SessionState> SessionManager::resume( QPID_MSG("No suspended session with id=" << id)); active.insert(id); std::auto_ptr<SessionState> state(suspended.release(i).release()); - state->handler = &sh; return state; } @@ -94,8 +93,10 @@ void SessionManager::eraseExpired() { Suspended::iterator keep = std::lower_bound( suspended.begin(), suspended.end(), now(), boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); - QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); - suspended.erase(suspended.begin(), keep); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } } } diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 58a7b3f01f..fa7262252d 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -44,7 +44,7 @@ class SessionHandler; */ class SessionManager : private boost::noncopyable { public: - SessionManager(); + SessionManager(uint32_t ack); ~SessionManager(); /** Open a new active session, caller takes ownership */ std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_); @@ -57,18 +57,20 @@ class SessionManager : private boost::noncopyable { /** Resume a suspended session. *@throw Exception if timed out or non-existant. */ - std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&); + std::auto_ptr<SessionState> resume(const framing::Uuid&); private: typedef boost::ptr_vector<SessionState> Suspended; typedef std::set<framing::Uuid> Active; + void erase(const framing::Uuid&); + void eraseExpired(); + sys::Mutex lock; Suspended suspended; Active active; - - void erase(const framing::Uuid&); - void eraseExpired(); + uint32_t ack; + friend class SessionState; // removes deleted sessions from active set. }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 17537e11be..45d78c9307 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -31,22 +31,25 @@ namespace broker { using namespace framing; -SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_) - : factory(f), handler(&h), id(true), timeout(timeout_), - broker(h.getConnection().broker), - version(h.getConnection().getVersion()) -{ - // FIXME aconway 2007-09-21: Break dependnecy - broker updates session. - chain.push_back(new SemanticHandler(*this)); - in = &chain[0]; // Incoming frame to handler chain. - out = &handler->out; // Outgoing frames to SessionHandler +void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } - // FIXME aconway 2007-09-20: use broker to add plugin - // handlers to the chain. - // FIXME aconway 2007-08-31: Shouldn't be passing channel ID. - broker.update(handler->getChannel(), *this); +void SessionState::handleOut(AMQFrame& f) { + assert(handler); + handler->out.handle(f); } +SessionState::SessionState( + SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack), + factory(f), handler(&h), id(true), timeout(timeout_), + broker(h.getConnection().broker), + version(h.getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + // FIXME aconway 2007-09-20: SessionManager may add plugin + // handlers to the chain. + } + SessionState::~SessionState() { // Remove ID from active session list. factory.erase(getId()); @@ -65,4 +68,12 @@ Connection& SessionState::getConnection() { return getHandler().getConnection(); } +void SessionState::detach() { + handler = 0; +} + +void SessionState::attach(SessionHandler& h) { + handler = &h; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index d152937692..eed088af31 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -24,11 +24,12 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Time.h" -#include <boost/ptr_container/ptr_vector.hpp> #include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> #include <set> #include <vector> @@ -42,31 +43,26 @@ class AMQP_ClientProxy; namespace broker { +class SemanticHandler; class SessionHandler; class SessionManager; class Broker; class Connection; /** - * State of a session. - * - * An attached session has a SessionHandler which is attached to a - * connection. A suspended session has no handler. - * - * A SessionState is always associated with an open session (attached or - * suspended) it is destroyed when the session is closed. - * - * The SessionState includes the sessions handler chains, which may - * themselves have state. The handlers will be preserved as long as - * the session is alive. + * Broker-side session state includes sessions handler chains, which may + * themselves have state. */ -class SessionState : public framing::FrameHandler::Chains, - private boost::noncopyable +class SessionState : public framing::SessionState, + public framing::FrameHandler::InOutHandler { public: ~SessionState(); bool isAttached() { return handler; } + void detach(); + void attach(SessionHandler& handler); + /** @pre isAttached() */ SessionHandler& getHandler(); @@ -76,23 +72,30 @@ class SessionState : public framing::FrameHandler::Chains, /** @pre isAttached() */ Connection& getConnection(); - const framing::Uuid& getId() const { return id; } uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } + + protected: + void handleIn(framing::AMQFrame&); + void handleOut(framing::AMQFrame&); private: - /** Only SessionManager can open sessions */ - SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_); - + // SessionManager creates sessions. + SessionState(SessionManager&, + SessionHandler& out, + uint32_t timeout, + uint32_t ackInterval); + SessionManager& factory; SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; - boost::ptr_vector<framing::FrameHandler> chain; framing::ProtocolVersion version; + + boost::scoped_ptr<SemanticHandler> semanticHandler; friend class SessionManager; }; diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp index be75346578..14727b3b35 100644 --- a/cpp/src/qpid/broker/Timer.cpp +++ b/cpp/src/qpid/broker/Timer.cpp @@ -73,17 +73,14 @@ void Timer::start() Monitor::ScopedLock l(monitor); if (!active) { active = true; - runner = std::auto_ptr<Thread>(new Thread(this)); + runner = Thread(this); } } void Timer::stop() { signalStop(); - if (runner.get()) { - runner->join(); - runner.reset(); - } + runner.join(); } void Timer::signalStop() { diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h index c70ffeaedc..e89ae499b7 100644 --- a/cpp/src/qpid/broker/Timer.h +++ b/cpp/src/qpid/broker/Timer.h @@ -53,7 +53,7 @@ class Timer : private qpid::sys::Runnable { qpid::sys::Monitor monitor; std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks; - std::auto_ptr<qpid::sys::Thread> runner; + qpid::sys::Thread runner; bool active; void run(); diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index cef34630db..16e0428a56 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -24,7 +24,6 @@ #include "Channel.h" #include "qpid/sys/Monitor.h" #include "Message.h" -#include "qpid/QpidError.h" #include "Connection.h" #include "Demux.h" #include "FutureResponse.h" @@ -71,7 +70,7 @@ void Channel::open(const Session& s) { Mutex::ScopedLock l(stopLock); if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + throw ChannelBusyException(); active = true; session = s; if(isTransactional()) { @@ -142,7 +141,7 @@ void Channel::consume( Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i != consumers.end()) - throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag )); Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 0a6a88ae90..932fab8881 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -29,7 +29,7 @@ #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" +#include "qpid/shared_ptr.h" #include <iostream> #include <sstream> #include <functional> @@ -44,23 +44,26 @@ namespace client { Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))), - isOpen(false) {} + isOpen(false), + impl(new ConnectionImpl( + shared_ptr<Connector>(new Connector(_version, _debug)))) +{} -Connection::Connection(boost::shared_ptr<Connector> c) : +Connection::Connection(shared_ptr<Connector> c) : channelIdCounter(0), version(framing::highestProtocolVersion), max_frame_size(65536), - impl(new ConnectionImpl(c)), - isOpen(false) {} + isOpen(false), + impl(new ConnectionImpl(c)) +{} -Connection::~Connection(){} +Connection::~Connection(){ } void Connection::open( const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& vhost) { if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + throw Exception(QPID_MSG("Channel object is already open")); impl->open(host, port, uid, pwd, vhost); isOpen = true; @@ -79,10 +82,9 @@ Session Connection::newSession(uint32_t detachedLifetime) { } void Connection::resume(Session& session) { - shared_ptr<SessionCore> core=session.impl; - core->setChannel(++channelIdCounter); - impl->addSession(core); - core->resume(impl); + session.impl->setChannel(++channelIdCounter); + impl->addSession(session.impl); + session.impl->resume(impl); } void Connection::close() { diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 2e5059f135..d2612ca754 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -23,7 +23,6 @@ */ #include <map> #include <string> -#include "qpid/QpidError.h" #include "Channel.h" #include "ConnectionImpl.h" #include "qpid/client/Session.h" @@ -57,10 +56,12 @@ class Connection framing::ChannelId channelIdCounter; framing::ProtocolVersion version; const uint32_t max_frame_size; - shared_ptr<ConnectionImpl> impl; bool isOpen; bool debug; - + + protected: + boost::shared_ptr<ConnectionImpl> impl; + public: /** * Creates a connection object, but does not open the diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 4058bfb33f..a8f10c32a9 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -68,7 +68,7 @@ void ConnectionHandler::incoming(AMQFrame& frame) try { in(frame); }catch(ConnectionException& e){ - error(e.code, e.toString(), body); + error(e.code, e.what(), body); }catch(std::exception& e){ error(541/*internal error*/, e.what(), body); } @@ -124,6 +124,8 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) { + if (onError) + onError(code, message); AMQMethodBody* method = body->getMethod(); if (method) error(code, message, method->amqpClassId(), method->amqpMethodId()); diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index fae93e8294..f9273bc165 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" #include "ConnectionImpl.h" @@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) { handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); handler.out = boost::bind(&Connector::send, connector, _1); - handler.onClose = boost::bind(&ConnectionImpl::closed, this); - handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2); + handler.onClose = boost::bind(&ConnectionImpl::closed, this, + REPLY_SUCCESS, std::string()); + handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); connector->setInputHandler(&handler); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); @@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s = sessions[frame.getChannel()].lock(); } if (!s) - throw ChannelErrorException(); + throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel())); s->in(frame); } @@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port, void ConnectionImpl::close() { - assertNotClosed(); - handler.close(); -} - -void ConnectionImpl::closed() -{ - closedByPeer(200, "OK"); -} - -void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text) -{ - signalClose(code, text); - connector->close(); + if (!isClosed) + handler.close(); } void ConnectionImpl::idleIn() @@ -110,26 +101,39 @@ void ConnectionImpl::idleOut() connector->send(frame); } +template <class F> +void ConnectionImpl::forChannels(F functor) { + for (SessionMap::iterator i = sessions.begin(); + i != sessions.end(); ++i) { + try { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) functor(*s); + } catch (...) { assert(0); } + } +} + void ConnectionImpl::shutdown() { - //this indicates that the socket to the server has closed - signalClose(0, "Unexpected socket closure."); + Mutex::ScopedLock l(lock); + if (isClosed) return; + forChannels(boost::bind(&SessionCore::connectionBroke, _1, + INTERNAL_ERROR, "Unexpected socket closure.")); + sessions.clear(); + isClosed = true; } -void ConnectionImpl::signalClose(uint16_t code, const std::string& text) +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) - s->closed(code, text); - } + if (isClosed) return; + forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); sessions.clear(); isClosed = true; + connector->close(); } -void ConnectionImpl::assertNotClosed() -{ +void ConnectionImpl::erase(uint16_t ch) { Mutex::ScopedLock l(lock); - if (isClosed) throw Exception("Connection has been closed"); + sessions.erase(ch); } + diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index f20534f1aa..46bd5b685d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -51,14 +51,14 @@ class ConnectionImpl : public framing::FrameHandler, bool isClosed; void incoming(framing::AMQFrame& frame); - void closed(); - void closedByPeer(uint16_t, const std::string&); + void closed(uint16_t, const std::string&); void idleOut(); void idleIn(); void shutdown(); - void signalClose(uint16_t, const std::string&); - void assertNotClosed(); -public: + + template <class F> void forChannels(F functor); + + public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; ConnectionImpl(boost::shared_ptr<Connector> c); @@ -69,7 +69,9 @@ public: const std::string& pwd = "guest", const std::string& virtualhost = "/"); void close(); - void handle(framing::AMQFrame& frame); + void handle(framing::AMQFrame& frame); + void erase(uint16_t channel); + boost::shared_ptr<Connector> getConnector() { return connector; } }; }} diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index b1ec580605..ba11ea5569 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -20,7 +20,6 @@ */ #include <iostream> #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "Connector.h" @@ -36,7 +35,6 @@ namespace client { using namespace qpid::sys; using namespace qpid::framing; -using qpid::QpidError; Connector::Connector( ProtocolVersion ver, bool _debug, uint32_t buffer_size diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 8aaaea247a..af6badd6e0 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -98,6 +98,7 @@ class Connector : public framing::OutputHandler, virtual void setInputHandler(framing::InputHandler* handler); virtual void setTimeoutHandler(sys::TimeoutHandler* handler); virtual void setShutdownHandler(sys::ShutdownHandler* handler); + virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; } virtual framing::OutputHandler* getOutputHandler(); virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index e4edece414..c70b0fc455 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -73,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame) void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { if (range.size() % 2) { //must be even number - throw ConnectionException(530, "Received odd number of elements in ranged mark"); + throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark")); } else { SequenceNumber mark(cumulative); { diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h index 667a19e942..d07f9f149c 100644 --- a/cpp/src/qpid/client/Future.h +++ b/cpp/src/qpid/client/Future.h @@ -63,7 +63,7 @@ public: boost::bind(&FutureCompletion::completed, &callback) ); callback.waitForCompletion(); - session.checkClosed(); + session.assertOpen(); complete = true; } } diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index 73b7c3a7a6..5d36a1d873 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -31,7 +31,7 @@ using namespace qpid::sys; AMQMethodBody* FutureResponse::getResponse(SessionCore& session) { waitForCompletion(); - session.checkClosed(); + session.assertOpen(); return response.get(); } diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp index a523129206..681202edea 100644 --- a/cpp/src/qpid/client/FutureResult.cpp +++ b/cpp/src/qpid/client/FutureResult.cpp @@ -30,7 +30,7 @@ using namespace qpid::sys; const std::string& FutureResult::getResult(SessionCore& session) const { waitForCompletion(); - session.checkClosed(); + session.assertOpen(); return result; } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 966d07eaef..27440465fe 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -24,105 +24,301 @@ #include "FutureResponse.h" #include "FutureResult.h" #include "ConnectionImpl.h" - +#include "qpid/framing/FrameSet.h" #include "qpid/framing/constants.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> -using namespace qpid::client; +namespace qpid { +namespace client { + using namespace qpid::framing; -SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t maxFrameSize) - : connection(conn), channel(ch), l2(*this), l3(maxFrameSize), - uuid(false), sync(false) +namespace { const std::string OK="ok"; } + +typedef sys::Monitor::ScopedLock Lock; +typedef sys::Monitor::ScopedUnlock UnLock; + +inline void SessionCore::invariant() const { + switch (state.get()) { + case OPENING: + assert(!session); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case RESUMING: + assert(session); + assert(session->getState() == SessionState::RESUMING); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case OPEN: + case CLOSING: + case SUSPENDING: + assert(session); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case SUSPENDED: + assert(code==REPLY_SUCCESS); + assert(session); + assert(!connection); + break; + case CLOSED: + assert(!session); + assert(!connection); + break; + } +} + +inline void SessionCore::setState(State s) { + state = s; + invariant(); +} + +inline void SessionCore::waitFor(State s) { + invariant(); + // We can be CLOSED or SUSPENDED by error at any time. + state.waitFor(States(s, CLOSED, SUSPENDED)); + check(); + assert(state==s); + invariant(); +} + +SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, + uint16_t ch, uint64_t maxFrameSize) + : l3(maxFrameSize), + sync(false), + channel(ch), + proxy(channel), + state(OPENING) { - l2.next = &l3; l3.out = &out; - out.next = connection.get(); + attaching(conn); } -SessionCore::~SessionCore() {} +void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { + assert(c); + assert(channel.get()); + connection = c; + channel.next = connection.get(); + code = REPLY_SUCCESS; + text = OK; + state = session ? RESUMING : OPENING; + invariant(); +} -ExecutionHandler& SessionCore::getExecution() -{ - checkClosed(); - return l3; +SessionCore::~SessionCore() { + Lock l(state); + invariant(); + detach(COMMAND_INVALID, "Session deleted"); + state.waitAll(); } -FrameSet::shared_ptr SessionCore::get() -{ - checkClosed(); - return l3.getDemux().getDefault().pop(); +void SessionCore::detach(int c, const std::string& t) { + connection.reset(); + channel.next = 0; + code=c; + text=t; } -void SessionCore::setSync(bool s) -{ +void SessionCore::doClose(int code, const std::string& text) { + if (state != CLOSED) { + session.reset(); + l3.getDemux().close(); + l3.getCompletionTracker().close(); + detach(code, text); + setState(CLOSED); + } + invariant(); +} + +void SessionCore::doSuspend(int code, const std::string& text) { + if (state != CLOSED) { + invariant(); + detach(code, text); + session->suspend(); + setState(SUSPENDED); + } +} + +ExecutionHandler& SessionCore::getExecution() { // user thread + return l3; +} + +void SessionCore::setSync(bool s) { // user thread sync = s; } -bool SessionCore::isSync() -{ +bool SessionCore::isSync() { // user thread return sync; } -namespace { -struct ClosedOnExit { - SessionCore& core; - int code; - std::string text; - ClosedOnExit(SessionCore& s, int c, const std::string& t) - : core(s), code(c), text(t) {} - ~ClosedOnExit() { core.closed(code, text); } -}; +FrameSet::shared_ptr SessionCore::get() { // user thread + // No lock here: pop does a blocking wait. + return l3.getDemux().getDefault().pop(); +} + +void SessionCore::open(uint32_t detachedLifetime) { // user thread + Lock l(state); + check(state==OPENING && !session, + COMMAND_INVALID, QPID_MSG("Cannot re-open a session.")); + proxy.open(detachedLifetime); + waitFor(OPEN); +} + +void SessionCore::close() { // user thread + Lock l(state); + check(); + if (state==OPEN) { + setState(CLOSING); + proxy.close(); + waitFor(CLOSED); + } + else + doClose(REPLY_SUCCESS, OK); +} + +void SessionCore::suspend() { // user thread + Lock l(state); + checkOpen(); + setState(SUSPENDING); + proxy.suspend(); + waitFor(SUSPENDED); } -void SessionCore::close() +void SessionCore::setChannel(uint16_t ch) { channel=ch; } + +void SessionCore::resume(shared_ptr<ConnectionImpl> c) { + // user thread + { + Lock l(state); + if (state==OPEN) + doSuspend(REPLY_SUCCESS, OK); + check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed.")); + SequenceNumber sendAck=session->resuming(); + attaching(c); + proxy.resume(getId()); + waitFor(OPEN); + proxy.ack(sendAck, SequenceNumberSet()); + // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem + // for large replay sets. + SessionState::Replay replay=session->replay(); + for (SessionState::Replay::iterator i = replay.begin(); + i != replay.end(); ++i) + { + invariant(); + channel.handle(*i); // Direct to channel. + check(); + } + } +} + +void SessionCore::assertOpen() const { + Lock l(state); + checkOpen(); +} + +// network thread +void SessionCore::attached(const Uuid& sessionId, + uint32_t /*detachedLifetime*/) { - checkClosed(); - ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user."); - l2.close(); + Lock l(state); + invariant(); + check(state == OPENING || state == RESUMING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.attached")); + if (state==OPENING) { // New session + // FIXME aconway 2007-10-17: arbitrary ack value of 100 for + // client, allow configuration. + session=in_place<SessionState>(100, sessionId); + setState(OPEN); + } + else { // RESUMING + check(sessionId == session->getId(), + INVALID_ARGUMENT, QPID_MSG("session.resumed has invalid ID.")); + // Don't setState yet, wait for first incoming ack. + } } -void SessionCore::suspend() { - checkClosed(); - ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended"); - l2.suspend(); +void SessionCore::detached() { // network thread + Lock l(state); + check(state == SUSPENDING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.detached.")); + connection->erase(channel); + doSuspend(REPLY_SUCCESS, OK); +} + +void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) { + Lock l(state); + invariant(); + check(state==OPEN || state==RESUMING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.ack")); + session->receivedAck(ack); + if (state==RESUMING) { + setState(OPEN); + } + invariant(); } void SessionCore::closed(uint16_t code, const std::string& text) -{ - out.next = 0; - reason.code = code; - reason.text = text; - l2.closed(); - l3.getDemux().close(); - l3.getCompletionTracker().close(); +{ // network thread + Lock l(state); + invariant(); + doClose(code, text); } -void SessionCore::checkClosed() const -{ - // TODO: could have been a connection exception - if(out.next == 0) - throw ChannelException(reason.code, reason.text); +// closed by connection +void SessionCore::connectionClosed(uint16_t code, const std::string& text) { + Lock l(state); + try { + doClose(code, text); + } catch(...) { assert (0); } } -void SessionCore::open(uint32_t detachedLifetime) { - assert(out.next); - l2.open(detachedLifetime); +void SessionCore::connectionBroke(uint16_t code, const std::string& text) { + Lock l(state); + try { + doSuspend(code, text); + } catch (...) { assert(0); } } -void SessionCore::resume(shared_ptr<ConnectionImpl> conn) { - connection = conn; - out.next = connection.get(); - l2.resume(); +void SessionCore::check() const { // Called with lock held. + invariant(); + if (code != REPLY_SUCCESS) + throwReplyException(code, text); +} + +void SessionCore::check(bool cond, int newCode, const std::string& msg) const { + check(); + if (!cond) { + const_cast<SessionCore*>(this)->doClose(newCode, msg); + throwReplyException(code, text); + } } -Future SessionCore::send(const AMQBody& command) -{ - checkClosed(); +void SessionCore::checkOpen() const { + if (state==SUSPENDED) { + std::string cause; + if (code != REPLY_SUCCESS) + cause=" by :"+text; + throw CommandInvalidException(QPID_MSG("Session is suspended" << cause)); + } + check(state==OPEN, COMMAND_INVALID, QPID_MSG("Session is not open")); +} +Future SessionCore::send(const AMQBody& command) +{ + Lock l(state); + checkOpen(); command.getMethod()->setSync(sync); - Future f; //any result/response listeners must be set before the command is sent if (command.getMethod()->resultExpected()) { @@ -145,21 +341,61 @@ Future SessionCore::send(const AMQBody& command) Future SessionCore::send(const AMQBody& command, const MethodContent& content) { - checkClosed(); + Lock l(state); + checkOpen(); //content bearing methods don't currently have responses or //results, if that changes should follow procedure for the other //send method impl: return Future(l3.send(command, content)); } +// Network thread. void SessionCore::handleIn(AMQFrame& frame) { - l2.handle(frame); + try { + // Cast to expose private SessionHandler functions. + if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + session->received(frame); + l3.handle(frame); + } + } catch (const ChannelException& e) { + QPID_LOG(error, "Channel exception:" << e.what()); + doClose(e.code, e.what()); + } } void SessionCore::handleOut(AMQFrame& frame) { - checkClosed(); - frame.setChannel(channel); - out.next->handle(frame); + Lock l(state); + if (state==OPEN) { + if (session->sent(frame)) + proxy.solicitAck(); + channel.handle(frame); + } +} + +void SessionCore::solicitAck( ) { + Lock l(state); + checkOpen(); + proxy.ack(session->sendingAck(), SequenceNumberSet()); +} + +void SessionCore::flow(bool) { + assert(0); throw NotImplementedException("session.flow"); +} + +void SessionCore::flowOk(bool /*active*/) { + assert(0); throw NotImplementedException("session.flow"); +} + +void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) { + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.highWaterMark"); +} + +const Uuid SessionCore::getId() const { + if (session) + return session->getId(); + throw Exception(QPID_MSG("Closed session, no ID.")); } +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index ac109e1f5c..38c72359a3 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -22,17 +22,25 @@ #ifndef _SessionCore_ #define _SessionCore_ -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> -#include "qpid/framing/AMQMethodBody.h" +#include "qpid/shared_ptr.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/Uuid.h" -#include "SessionHandler.h" +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/StateMonitor.h" #include "ExecutionHandler.h" +#include <boost/optional.hpp> + namespace qpid { +namespace framing { +class FrameSet; +class MethodContent; +class SequenceNumberSet; +} + namespace client { class Future; @@ -43,60 +51,90 @@ class ConnectionImpl; * Attaches to a SessionHandler when active, detaches * when closed. */ -class SessionCore : public framing::FrameHandler::InOutHandler +class SessionCore : public framing::FrameHandler::InOutHandler, + private framing::AMQP_ClientOperations::SessionHandler { - struct Reason - { - uint16_t code; - std::string text; - }; - - shared_ptr<ConnectionImpl> connection; - uint16_t channel; - SessionHandler l2; - ExecutionHandler l3; - framing::Uuid uuid; - volatile bool sync; - Reason reason; - - protected: - void handleIn(framing::AMQFrame& frame); - void handleOut(framing::AMQFrame& frame); - public: SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); ~SessionCore(); framing::FrameSet::shared_ptr get(); + const framing::Uuid getId() const; + uint16_t getChannel() const { return channel; } + void assertOpen() const; - framing::Uuid getId() const { return uuid; } - void setId(const framing::Uuid& id) { uuid= id; } - - uint16_t getChannel() const { assert(channel); return channel; } - void setChannel(uint16_t ch) { assert(ch); channel=ch; } - + // NOTE: Public functions called in user thread. void open(uint32_t detachedLifetime); - - /** Closed by client code */ void close(); - - /** Closed by peer */ - void closed(uint16_t code, const std::string& text); - void resume(shared_ptr<ConnectionImpl>); void suspend(); + void setChannel(uint16_t channel); - void setSync(bool); + void setSync(bool s); bool isSync(); ExecutionHandler& getExecution(); - void checkClosed() const; Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); -}; -} -} + void connectionClosed(uint16_t code, const std::string& text); + void connectionBroke(uint16_t code, const std::string& text); + + private: + enum State { + OPENING, + RESUMING, + OPEN, + CLOSING, + SUSPENDING, + SUSPENDED, + CLOSED + }; + typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; + typedef sys::StateMonitor<State, CLOSED> StateMonitor; + typedef StateMonitor::Set States; + + inline void invariant() const; + inline void setState(State s); + inline void waitFor(State); + void doClose(int code, const std::string& text); + void doSuspend(int code, const std::string& text); + + /** If there is an error, throw the exception */ + void check(bool condition, int code, const std::string& text) const; + /** Throw if *error */ + void check() const; + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + + // Private functions are called by broker in network thread. + void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); + void flow(bool active); + void flowOk(bool active); + void detached(); + void ack(uint32_t cumulativeSeenMark, + const framing::SequenceNumberSet& seenFrameSet); + void highWaterMark(uint32_t lastSentMark); + void solicitAck(); + void closed(uint16_t code, const std::string& text); + + void attaching(shared_ptr<ConnectionImpl>); + void detach(int code, const std::string& text); + void checkOpen() const; + + int code; // Error code + std::string text; // Error text + boost::optional<framing::SessionState> session; + shared_ptr<ConnectionImpl> connection; + ExecutionHandler l3; + volatile bool sync; + framing::ChannelHandler channel; + framing::AMQP_ServerProxy::Session proxy; + mutable StateMonitor state; +}; +}} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp deleted file mode 100644 index d3b04e5356..0000000000 --- a/cpp/src/qpid/client/SessionHandler.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/client/SessionCore.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; - -namespace { -// TODO aconway 2007-09-28: hack till we have multi-version support. -ProtocolVersion version; -} - -SessionHandler::SessionHandler(SessionCore& parent) - : StateManager(CLOSED), core(parent) {} - -SessionHandler::~SessionHandler() {} - -void SessionHandler::handle(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - if (getState() == OPEN) { - core.checkClosed(); - SessionClosedBody* closedBody= - dynamic_cast<SessionClosedBody*>(body->getMethod()); - if (closedBody) { - closed(); - core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); - } else { - try { - next->handle(frame); - } - catch(const ChannelException& e){ - QPID_LOG(error, "Channel exception:" << e.what()); - closed(); - AMQFrame f(0, SessionClosedBody(version, e.code, e.toString())); - core.out(f); - core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); - } - } - } else { - if (body->getMethod()) - handleMethod(body->getMethod()); - else - throw ConnectionException(504, "Channel not open for content."); - } -} - -void SessionHandler::attach(const AMQMethodBody& command) -{ - setState(OPENING); - AMQFrame f(0, command); - core.out(f); - std::set<int> states; - states.insert(OPEN); - states.insert(CLOSED); - waitFor(states); - if (getState() != OPEN) - throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel())); -} - -void SessionHandler::open(uint32_t detachedLifetime) { - attach(SessionOpenBody(version, detachedLifetime)); -} - -void SessionHandler::resume() { - attach(SessionResumeBody(version, core.getId())); -} - -void SessionHandler::detach(const AMQMethodBody& command) -{ - setState(CLOSING); - AMQFrame f(0, command); - core.out(f); - waitFor(CLOSED); -} - -void SessionHandler::close() { detach(SessionCloseBody(version)); } -void SessionHandler::suspend() { detach(SessionSuspendBody(version)); } -void SessionHandler::closed() { setState(CLOSED); } - -void SessionHandler::handleMethod(AMQMethodBody* method) -{ - switch (getState()) { - case OPENING: { - SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method); - if (attached) { - core.setId(attached->getSessionId()); - setState(OPEN); - } else - throw ChannelErrorException(); - break; - } - case CLOSING: - if (method->isA<SessionClosedBody>() || - method->isA<SessionDetachedBody>()) - closed(); - break; - - case CLOSED: - throw ChannelErrorException(); - - default: - assert(0); - throw InternalErrorException(QPID_MSG("Internal Error.")); - } -} - diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h deleted file mode 100644 index 994b8402de..0000000000 --- a/cpp/src/qpid/client/SessionHandler.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandler_ -#define _SessionHandler_ - -#include "StateManager.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/Uuid.h" -#include "qpid/shared_ptr.h" - -namespace qpid { -namespace client { -class SessionCore; - -/** - * Handles incoming session (L2) commands. - */ -class SessionHandler : public framing::FrameHandler, - private StateManager -{ - enum STATES {OPENING, OPEN, CLOSING, CLOSED}; - SessionCore& core; - - void handleMethod(framing::AMQMethodBody* method); - void attach(const framing::AMQMethodBody&); - void detach(const framing::AMQMethodBody&); - - public: - SessionHandler(SessionCore& parent); - ~SessionHandler(); - - /** Incoming from broker */ - void handle(framing::AMQFrame&); - - void open(uint32_t detachedLifetime); - void resume(); - void close(); - void closed(); - void suspend(); -}; - -}} - -#endif diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp index b72967c098..0cb3c6b9d4 100644 --- a/cpp/src/qpid/client/StateManager.cpp +++ b/cpp/src/qpid/client/StateManager.cpp @@ -60,7 +60,7 @@ void StateManager::setState(int s) stateLock.notifyAll(); } -int StateManager::getState() +int StateManager::getState() const { Monitor::ScopedLock l(stateLock); return state; diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h index fd0c1b7f86..2f8ecb772c 100644 --- a/cpp/src/qpid/client/StateManager.h +++ b/cpp/src/qpid/client/StateManager.h @@ -30,12 +30,12 @@ namespace client { class StateManager { int state; - sys::Monitor stateLock; + mutable sys::Monitor stateLock; public: StateManager(int initial); void setState(int state); - int getState(); + int getState() const ; void waitForStateChange(int current); void waitFor(std::set<int> states); void waitFor(int state); diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index abd33c4158..423af06173 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -20,9 +20,9 @@ */ #include "AMQFrame.h" -#include "qpid/QpidError.h" #include "qpid/framing/variant.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> @@ -103,7 +103,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t flags = buffer.getOctet(); uint8_t framing_version = (flags & 0xc0) >> 6; if (framing_version != 0) - THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported"); + throw SyntaxErrorException(QPID_MSG("Framing version unsupported")); bof = flags & 0x08; eof = flags & 0x04; bos = flags & 0x02; @@ -111,7 +111,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t type = buffer.getOctet(); uint16_t frame_size = buffer.getShort(); if (frame_size < frameOverhead()-1) - THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small"); + throw SyntaxErrorException(QPID_MSG("Frame size too small")); uint8_t reserved1 = buffer.getOctet(); uint8_t field1 = buffer.getOctet(); subchannel = field1 & 0x0f; @@ -121,7 +121,7 @@ bool AMQFrame::decode(Buffer& buffer) // Verify that the protocol header meets current spec // TODO: should we check reserved2 against zero as well? - the spec isn't clear if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) - THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero"); + throw SyntaxErrorException(QPID_MSG("Reserved bits not zero")); // TODO: should no longer care about body size and only pass up B,E,b,e flags uint16_t body_size = frame_size + 1 - frameOverhead(); @@ -133,7 +133,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t end = buffer.getOctet(); if (end != 0xCE) - THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + throw SyntaxErrorException(QPID_MSG("Frame end not found")); return true; } @@ -147,9 +147,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break; default: - THROW_QPID_ERROR( - FRAMING_ERROR, - boost::format("Unknown frame type %d") % type); + throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type)); } boost::apply_visitor(DecodeVisitor(buffer,size), body); } diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp index 53a01141c1..fb84be7cd6 100644 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ b/cpp/src/qpid/framing/BodyHandler.cpp @@ -18,14 +18,13 @@ * under the License. * */ -#include "qpid/QpidError.h" #include "BodyHandler.h" #include "AMQMethodBody.h" #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" - #include <boost/cast.hpp> +#include "qpid/framing/reply_exceptions.h" using namespace qpid::framing; using namespace boost; @@ -49,7 +48,8 @@ void BodyHandler::handleBody(AMQBody* body) { handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); break; default: - QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); + throw SyntaxErrorException( + QPID_MSG("Invalid frame type " << body->type())); } } diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 6a466fdfab..8c1a4e1e9e 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -15,8 +15,6 @@ * limitations under the License. * */ -#include <boost/format.hpp> - #include "ChannelAdapter.h" #include "OutputHandler.h" #include "AMQFrame.h" @@ -26,8 +24,6 @@ #include "AMQMethodBody.h" #include "qpid/framing/ConnectionOpenBody.h" -using boost::format; - namespace qpid { namespace framing { @@ -53,20 +49,20 @@ void ChannelAdapter::send(const AMQBody& body) void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID) - throw ConnectionException( - 504, format("Connection method on non-0 channel %d.")%getId()); + throw ChannelErrorException( + QPID_MSG("Connection method on non-0 channel " << getId())); } void ChannelAdapter::assertChannelOpen() const { if (getId() != 0 && !isOpen()) - throw ConnectionException( - 504, format("Channel %d is not open.")%getId()); + throw ChannelErrorException( + QPID_MSG("Channel " << getId() << " is not open.")); } void ChannelAdapter::assertChannelNotOpen() const { if (getId() != 0 && isOpen()) - throw ConnectionException( - 504, format("Channel %d is already open.") % getId()); + throw ChannelErrorException( + QPID_MSG("Channel " << getId() << " is already open.")); } void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); } diff --git a/cpp/src/qpid/framing/ProtocolVersionException.h b/cpp/src/qpid/framing/ChannelHandler.h index bd16804470..69aaeac492 100644 --- a/cpp/src/qpid/framing/ProtocolVersionException.h +++ b/cpp/src/qpid/framing/ChannelHandler.h @@ -1,3 +1,6 @@ +#ifndef QPID_FRAMING_CHANNELHANDLER_H +#define QPID_FRAMING_CHANNELHANDLER_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,39 +21,33 @@ * under the License. * */ - -#ifndef _ProtocolVersionException_ -#define _ProtocolVersionException_ - -#include "qpid/Exception.h" -#include "ProtocolVersion.h" -#include <string> -#include <vector> +#include "FrameHandler.h" +#include "AMQFrame.h" namespace qpid { namespace framing { -class ProtocolVersionException : public qpid::Exception +/** + * Sets the channel number on outgoing frames. + */ +class ChannelHandler : public FrameHandler { -protected: - ProtocolVersion versionFound; - -public: - ~ProtocolVersionException() throw() {} - - template <class T> - ProtocolVersionException( - ProtocolVersion ver, const T& msg) throw () : versionFound(ver) - { init(boost::lexical_cast<std::string>(msg)); } - - template <class T> - ProtocolVersionException(const T& msg) throw () - { init(boost::lexical_cast<std::string>(msg)); } + public: + ChannelHandler(uint16_t channelId=0, FrameHandler* next=0) + : FrameHandler(next), channel(channelId) {} + void handle(AMQFrame& frame) { + frame.setChannel(channel); + next->handle(frame); + } + uint16_t get() const { return channel; } + ChannelHandler& set(uint16_t ch) { channel=ch; return *this; } + operator uint16_t() const { return get(); } + ChannelHandler& operator=(uint16_t ch) { return set(ch); } private: - void init(const std::string& msg); + uint16_t channel; }; }} // namespace qpid::framing -#endif //ifndef _ProtocolVersionException_ +#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/ diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index 3c0284f2c8..089bc5d4a5 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -19,9 +19,10 @@ * */ #include "FieldTable.h" -#include "qpid/QpidError.h" #include "Buffer.h" #include "FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" #include <assert.h> namespace qpid { @@ -132,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){ uint32_t len = buffer.getLong(); uint32_t available = buffer.available(); if (available < len) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table."); + throw SyntaxErrorException(QPID_MSG("Not enough data for field table.")); uint32_t leftover = available - len; while(buffer.available() > leftover){ std::string name; diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp index a7535ae4b9..5526c9cb72 100644 --- a/cpp/src/qpid/framing/FieldValue.cpp +++ b/cpp/src/qpid/framing/FieldValue.cpp @@ -20,8 +20,7 @@ */ #include "FieldValue.h" #include "Buffer.h" -#include "qpid/QpidError.h" - +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -75,9 +74,7 @@ void FieldValue::decode(Buffer& buffer) data.reset(new FixedWidthValue<0>()); break; default: - std::stringstream out; - out << "Unknown field table value type: " << typeOctet; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); + throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); } data->decode(buffer); } diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp index 813e6fb49b..cd134b0e89 100644 --- a/cpp/src/qpid/framing/FramingContent.cpp +++ b/cpp/src/qpid/framing/FramingContent.cpp @@ -18,12 +18,10 @@ * under the License. * */ -#include <assert.h> - #include "Buffer.h" #include "FramingContent.h" -#include "qpid/QpidError.h" -#include <sstream> +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -37,12 +35,12 @@ Content::Content(uint8_t _discriminator, const string& _value): discriminator(_d void Content::validate() { if (discriminator == REFERENCE) { if(value.empty()) { - THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty"); + throw InvalidArgumentException( + QPID_MSG("Reference cannot be empty")); } }else if (discriminator != INLINE) { - std::stringstream out; - out << "Invalid discriminator: " << (int) discriminator; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); + throw SyntaxErrorException( + QPID_MSG("Invalid discriminator: " << discriminator)); } } diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 3e55dff1bd..fbf3c0b7ca 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -33,6 +33,7 @@ template <class T> struct Handler { typedef T HandledType; typedef void handleFptr(T); + typedef void result_type; // Compatible with std/boost functors. Handler(Handler<T>* next_=0) : next(next_) {} virtual ~Handler() {} @@ -51,7 +52,7 @@ struct Handler { struct Chain : public Handler<T> { Chain(Handler<T>* first=0) : Handler(first) {} void operator=(Handler<T>* h) { next = h; } - void handle(T t) { (*next)(t); } + void handle(T t) { next->handle(t); } // TODO aconway 2007-08-29: chain modifier ops here. }; diff --git a/cpp/src/qpid/framing/ProtocolVersionException.cpp b/cpp/src/qpid/framing/ProtocolVersionException.cpp deleted file mode 100644 index b68b3af1f9..0000000000 --- a/cpp/src/qpid/framing/ProtocolVersionException.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> -#include "ProtocolVersionException.h" - - -using namespace qpid::framing; - -void ProtocolVersionException::init(const std::string& msg) -{ - whatStr = boost::str( - boost::format("ProtocolVersionException: %s found: %s") - % versionFound.toString() % msg); -} - diff --git a/cpp/src/qpid/framing/ResumeHandler.cpp b/cpp/src/qpid/framing/ResumeHandler.cpp deleted file mode 100644 index 9d2c971459..0000000000 --- a/cpp/src/qpid/framing/ResumeHandler.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ResumeHandler.h" -#include "qpid/framing/reply_exceptions.h" - -#include <boost/bind.hpp> - -#include <algorithm> - -namespace qpid { -namespace framing { - -void ResumeHandler::ackReceived(SequenceNumber acked) { - if (lastSent < acked) - throw InvalidArgumentException("Invalid sequence number in ack"); - size_t keep = lastSent - acked; - if (keep < unacked.size()) - unacked.erase(unacked.begin(), unacked.end()-keep); -} - -void ResumeHandler::resend() { - std::for_each(unacked.begin(), unacked.end(), - boost::bind(&FrameHandler::handle,out->next, _1)); -} - -void ResumeHandler::handleIn(AMQFrame& f) { - ++lastReceived; - in.next->handle(f); -} - -void ResumeHandler::handleOut(AMQFrame& f) { - ++lastSent; - unacked.push_back(f); - out.next->handle(f); -} - - -}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/ResumeHandler.h b/cpp/src/qpid/framing/ResumeHandler.h deleted file mode 100644 index c86a60b9cb..0000000000 --- a/cpp/src/qpid/framing/ResumeHandler.h +++ /dev/null @@ -1,69 +0,0 @@ -#ifndef QPID_FRAMING_RESUMEHANDLER_H -#define QPID_FRAMING_RESUMEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SequenceNumber.h" - -#include <deque> - -namespace qpid { -namespace framing { - -/** - * In/out handler pair for managing exactly-once session delivery. - * The same handler is used by client and broker. - * This handler only deals with TCP style SequenceNumber acks, - * not with fragmented SequenceNumberSet. - * - * THREAD UNSAFE. Expected to be used in a serialized context. - */ -class ResumeHandler : public FrameHandler::InOutHandler -{ - public: - /** Received acknowledgement for sent frames up to and including sentOk */ - void ackReceived(SequenceNumber sentOk); - - /** What was the last sequence number we received. */ - SequenceNumber getLastReceived() { return lastReceived; } - - /** Resend the unacked frames to the output handler */ - void resend(); - - protected: - void handleIn(AMQFrame&); - void handleOut(AMQFrame&); - - private: - typedef std::deque<AMQFrame> Frames; - Frames unacked; - SequenceNumber lastReceived; - SequenceNumber lastSent; -}; - - -}} // namespace qpid::common - - -#endif /*!QPID_FRAMING_RESUMEHANDLER_H*/ diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index 9b8f0659b2..3aee04a4ce 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -47,6 +47,7 @@ class SequenceNumber bool operator<=(const SequenceNumber& other) const; bool operator>=(const SequenceNumber& other) const; uint32_t getValue() const { return (uint32_t) value; } + operator uint32_t() const { return (uint32_t) value; } friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); }; diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp new file mode 100644 index 0000000000..045a0ae115 --- /dev/null +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +#include <boost/bind.hpp> +#include <boost/none.hpp> + +namespace qpid { +namespace framing { + +SessionState::SessionState(uint32_t ack, const Uuid& uuid) : + state(ATTACHED), + id(uuid), + lastReceived(-1), + lastSent(-1), + ackInterval(ack), + sendAckAt(lastReceived+ackInterval), + solicitAckAt(lastSent+ackInterval), + ackSolicited(false) +{ + assert(ackInterval > 0); +} + +namespace { +bool isSessionCommand(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID; +} +} + +boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { + if (isSessionCommand(f)) + return boost::none; + if (state==RESUMING) + throw CommandInvalidException( + QPID_MSG("Invalid frame: Resuming session, expected session-ack")); + assert(state = ATTACHED); + assert(lastReceived<sendAckAt); + ++lastReceived; + QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); + if (lastReceived == sendAckAt) + return sendingAck(); + else + return boost::none; +} + +bool SessionState::sent(const AMQFrame& f) { + if (isSessionCommand(f)) + return false; + unackedOut.push_back(f); + ++lastSent; + QPID_LOG(trace, "Sent # "<< lastSent << " " << id); + return (state!=RESUMING) && + (lastSent == solicitAckAt) && + sendingSolicit(); +} + +SessionState::Replay SessionState::replay() { + Replay r(unackedOut.size()); + std::copy(unackedOut.begin(), unackedOut.end(), r.begin()); + return r; +} + +void SessionState::receivedAck(SequenceNumber acked) { + if (state==RESUMING) state=ATTACHED; + assert(state==ATTACHED); + if (lastSent < acked) + throw InvalidArgumentException("Invalid sequence number in ack"); + size_t keep = lastSent - acked; + if (keep < unackedOut.size()) + unackedOut.erase(unackedOut.begin(), unackedOut.end()-keep); + solicitAckAt = std::max(solicitAckAt, SequenceNumber(acked+ackInterval)); +} + +SequenceNumber SessionState::sendingAck() { + sendAckAt = lastReceived+ackInterval; + return lastReceived; +} + +bool SessionState::sendingSolicit() { + assert(state == ATTACHED); + if (ackSolicited) + return false; + solicitAckAt = lastSent + ackInterval; + return true; +} + +SequenceNumber SessionState::resuming() { + state = RESUMING; + return sendingAck(); +} + +void SessionState::suspend() { + state = SUSPENDED; +} + +}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h new file mode 100644 index 0000000000..66fc083d3f --- /dev/null +++ b/cpp/src/qpid/framing/SessionState.h @@ -0,0 +1,127 @@ +#ifndef QPID_FRAMING_SESSIONSTATE_H +#define QPID_FRAMING_SESSIONSTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/AMQFrame.h" + +#include <boost/optional.hpp> + +#include <deque> + +namespace qpid { +namespace framing { + +/** + * Session state common to client and broker. + * Implements session ack/resume protcools. + * + * A SessionState is always associated with an _open_ session (attached or + * suspended) it is destroyed when the session is closed. + * + * A template to make it protocol independent and easy to test. + */ +class SessionState +{ + public: + typedef std::vector<AMQFrame> Replay; + + /** States of a session. */ + enum State { + SUSPENDED, ///< Suspended, detached from any channel. + RESUMING, ///< Resuming: waiting for initial ack from peer. + ATTACHED ///< Attached to channel and operating normally. + }; + + /** + *Create a newly opened active session. + *@param ackInterval send/solicit an ack whenever N unacked frames + * have been received/sent. + *@pre ackInterval > 0 + */ + SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true)); + + const framing::Uuid& getId() const { return id; } + State getState() const { return state; } + + /** Received incoming L3 frame. + * @return SequenceNumber if an ack should be sent, empty otherwise. + * SessionState assumes that acks are sent whenever it returns + * a seq. number. + */ + boost::optional<SequenceNumber> received(const AMQFrame&); + + /** Sent outgoing L3 frame. + *@return true if solicit-ack should be sent. Note the SessionState + *assumes that a solicit-ack is sent every time it returns true. + */ + bool sent(const AMQFrame&); + + /** Received normal incoming ack. */ + void receivedAck(SequenceNumber); + + /** Frames to replay + *@pre getState()==ATTACHED + */ + Replay replay(); + + /** Suspend the session. */ + void suspend(); + + /** Start resume protocol for the session. + *@returns sequence number to ack immediately. */ + SequenceNumber resuming(); + + /** About to send an unscheduled ack, e.g. to respond to a solicit-ack. + * + * Note: when received() returns a sequence number this function + * should not be called. SessionState assumes that the ack is sent + * every time received() returns a sequence number. + */ + SequenceNumber sendingAck(); + + SequenceNumber getLastSent() const { return lastSent; } + SequenceNumber getLastReceived() const { return lastReceived; } + private: + typedef std::deque<AMQFrame> Unacked; + + bool sendingSolicit(); + + State state; + framing::Uuid id; + Unacked unackedOut; + SequenceNumber lastReceived; + SequenceNumber lastSent; + uint32_t ackInterval; + SequenceNumber sendAckAt; + SequenceNumber solicitAckAt; + bool ackSolicited; + bool suspending; +}; + + +}} // namespace qpid::common + + +#endif /*!QPID_FRAMING_SESSIONSTATE_H*/ diff --git a/cpp/src/qpid/framing/TemplateVisitor.h b/cpp/src/qpid/framing/TemplateVisitor.h new file mode 100644 index 0000000000..8c719e5110 --- /dev/null +++ b/cpp/src/qpid/framing/TemplateVisitor.h @@ -0,0 +1,89 @@ +#ifndef QPID_FRAMING_TEMPLATEVISITOR_H +#define QPID_FRAMING_TEMPLATEVISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/mpl/fold.hpp> +#include <boost/utility/value_init.hpp> + +namespace qpid { +namespace framing { + +/** + * Metafunction to generate a visitor class derived from Base with a + * visit for each type in TypeList calling functor F. TypeList may be + * any boost::mpl type collection e.g. mpl::list. + * + * Generated class is: TemplateVisitor<Base, F, TypeList>::type + * + * @see make_visitor + */ +template <class VisitTemplate, class TypeList, class F> +class TemplateVisitor +{ + struct Base : public VisitorBase { + F action; + Base(F f) : action(f) {} + using VisitorBase::visit; + }; + + template <class B, class T> struct Visit : public B { + Visit(F action) : B(action) {} + using B::visit; + void visit(const T& body) { action(body); } + }; + + typedef typename boost::mpl::fold< + TypeList, Base, Visit<boost::mpl::placeholders::_1, + boost::mpl::placeholders::_2> + >::type type; +}; + +/** + * Construct a TemplateVisitor to perform the given action, + * for example: + * @code + */ +template <class VisitorBase, class TypeList, class F> +TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) { + return TemplateVisitor<VisitorBase,TypeList,F>::type(action); +}; + +/** + * For method body classes in TypeList, invoke the corresponding function + * on Target and return true. For other body types return false. + */ +template <class TypeList, class Target> +bool invoke(const AMQBody& body, Target& target) { + typename InvokeVisitor<TypeList, Target>::type v(target); + body.accept(v); + return v.target; +} + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/ + +}} // namespace qpid::framing + + + +#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/ diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index e0372b2f68..1bb69fbca9 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -24,9 +24,13 @@ namespace qpid { namespace framing { -TransferContent::TransferContent(const std::string& _data) +TransferContent::TransferContent(const std::string& data, + const std::string& routingKey, + const std::string& exchange) { - setData(_data); + setData(data); + getDeliveryProperties().setRoutingKey(routingKey); + getDeliveryProperties().setExchange(exchange); } AMQHeaderBody TransferContent::getHeader() const @@ -73,14 +77,14 @@ void TransferContent::populate(const FrameSet& frameset) const MessageProperties& TransferContent::getMessageProperties() const { const MessageProperties* props = header.get<MessageProperties>(); - if (!props) throw NoSuchPropertiesException(); + if (!props) throw Exception("No message properties."); return *props; } const DeliveryProperties& TransferContent::getDeliveryProperties() const { const DeliveryProperties* props = header.get<DeliveryProperties>(); - if (!props) throw NoSuchPropertiesException(); + if (!props) throw Exception("No message properties."); return *props; } diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index 6fd96f3587..88f45b7e0a 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -30,14 +30,15 @@ namespace qpid { namespace framing { -struct NoSuchPropertiesException : public Exception {}; - class TransferContent : public MethodContent { AMQHeaderBody header; std::string data; public: - TransferContent(const std::string& data = ""); + TransferContent(const std::string& data = std::string(), + const std::string& routingKey = std::string(), + const std::string& exchange = std::string()); + AMQHeaderBody getHeader() const; void setData(const std::string&); void appendData(const std::string&); diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index 3a83430d56..2918c48ce3 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -17,9 +17,9 @@ */ #include "Uuid.h" - -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const { void Uuid::decode(Buffer& buf) { if (buf.available() < size()) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for UUID."); + throw SyntaxErrorException(QPID_MSG("Not enough data for UUID.")); buf.getRawData(c_array(), size()); } @@ -52,4 +52,10 @@ istream& operator>>(istream& in, Uuid& uuid) { return in; } +std::string Uuid::str() const { + std::ostringstream os; + os << *this; + return os.str(); +} + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h index 19ae79db6a..9bde67ad8e 100644 --- a/cpp/src/qpid/framing/Uuid.h +++ b/cpp/src/qpid/framing/Uuid.h @@ -62,6 +62,9 @@ struct Uuid : public boost::array<uint8_t, 16> { void encode(framing::Buffer& buf) const; void decode(framing::Buffer& buf); + + /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */ + std::string str() const; }; /** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */ diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index eec28333bc..69b5942ba0 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -32,4 +32,3 @@ #include "ProtocolInitiation.h" #include "BasicHeaderProperties.h" #include "ProtocolVersion.h" -#include "ProtocolVersionException.h" diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h index a788fe36e4..94442aa357 100644 --- a/cpp/src/qpid/framing/amqp_types.h +++ b/cpp/src/qpid/framing/amqp_types.h @@ -61,5 +61,11 @@ class Uuid; const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1; const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); +// Forward declare class types +class FramingContent; +class FieldTable; +class SequenceNumberSet; +class Uuid; + }} // namespace qpid::framing #endif diff --git a/cpp/src/qpid/framing/variant.h b/cpp/src/qpid/framing/variant.h index 3cb8aece5d..1fe81f8f67 100644 --- a/cpp/src/qpid/framing/variant.h +++ b/cpp/src/qpid/framing/variant.h @@ -23,7 +23,6 @@ /**@file Tools for using boost::variant */ -#include "qpid/QpidError.h" #include <boost/variant.hpp> @@ -39,7 +38,7 @@ template <class R=void> struct NoBlankVisitor : public boost::static_visitor<R> { R foundBlank() const { assert(0); - THROW_QPID_ERROR(INTERNAL_ERROR, "Invalid variant value."); + throw Exception(QPID_MSG("Invalid variant value.")); } R operator()(const boost::blank&) const { return foundBlank(); } R operator()(boost::blank&) const { return foundBlank(); } diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index 6e8f3a59cc..c483ed2379 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -190,12 +190,6 @@ void Logger::add(Statement& s) { statements.insert(&s); } -void Logger::remove(Statement& s) { - ScopedLock l(lock); - s.enabled = false; - statements.erase(&s); -} - void Logger::configure(const Options& opts, const std::string& prog) { clear(); diff --git a/cpp/src/qpid/log/Logger.h b/cpp/src/qpid/log/Logger.h index a2103f5ec6..7851c65406 100644 --- a/cpp/src/qpid/log/Logger.h +++ b/cpp/src/qpid/log/Logger.h @@ -67,9 +67,6 @@ class Logger : private boost::noncopyable { /** Add a statement. */ void add(Statement& s); - /** Remove a statement */ - void remove(Statement& s); - /** Log a message. */ void log(const Statement&, const std::string&); @@ -93,7 +90,7 @@ class Logger : private boost::noncopyable { /** Add an output destination for messages */ void output(std::auto_ptr<Output> out); - /** Reset the logger to it's original state */ + /** Reset the logger to it's original state. */ void clear(); private: diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp index 9ab314b81c..de130bc455 100644 --- a/cpp/src/qpid/log/Statement.cpp +++ b/cpp/src/qpid/log/Statement.cpp @@ -32,10 +32,6 @@ Statement::Initializer::Initializer(Statement& s) : statement(s) { Logger::instance().add(s); } -Statement::Initializer::~Initializer() { - Logger::instance().remove(statement); -} - namespace { const char* names[LevelTraits::COUNT] = { "trace", "debug", "info", "notice", "warning", "error", "critical" diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h index 4eb4d1e7d8..18162971b0 100644 --- a/cpp/src/qpid/log/Statement.h +++ b/cpp/src/qpid/log/Statement.h @@ -19,8 +19,9 @@ * */ +#include "qpid/Msg.h" + #include <boost/current_function.hpp> -#include <sstream> namespace qpid { namespace log { @@ -69,23 +70,14 @@ struct Statement { struct Initializer { Initializer(Statement& s); - ~Initializer(); Statement& statement; }; }; -///@internal trickery to make QPID_LOG_STRINGSTREAM work. -inline std::ostream& noop(std::ostream& s) { return s; } - ///@internal static initializer for a Statement. #define QPID_LOG_STATEMENT_INIT(level) \ { 0, __FILE__, __LINE__, BOOST_CURRENT_FUNCTION, (::qpid::log::level) } -///@internal Stream streamable message and return a string. -#define QPID_LOG_STRINGSTREAM(message) \ - static_cast<std::ostringstream&>( \ - std::ostringstream() << qpid::log::noop << message).str() - /** * Macro for log statements. Example of use: * @code @@ -110,19 +102,9 @@ inline std::ostream& noop(std::ostream& s) { return s; } static ::qpid::log::Statement stmt_= QPID_LOG_STATEMENT_INIT(level); \ static ::qpid::log::Statement::Initializer init_(stmt_); \ if (stmt_.enabled) \ - stmt_.log(QPID_LOG_STRINGSTREAM(message)); \ + stmt_.log(::qpid::Msg() << message); \ } while(0) -/** - * Macro for complicated logging logic that can't fit in a simple QPID_LOG - * statement. For example: - * @code - * QPID_IF_LOG(debug) { - * message = do_complicated_stuff; - * QPID_LOG(debug, message); - * } - */ -#define QPID_IF_LOG(level) }} // namespace qpid::log diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 733a892cff..eccfb1465e 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" @@ -301,7 +302,7 @@ void AsynchIOHandler::idle(AsynchIO&){ } // If frame was egregiously large complain if (frameSize > buff->byteCount) - THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); buff->dataCount = buffUsed; aio->queueWrite(buff); diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index 917afc5704..cf8199954e 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,7 @@ * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" #include "qpid/sys/ScopedIncrement.h" #include <boost/bind.hpp> @@ -39,73 +39,73 @@ namespace sys { * * Also allows consuming threads to wait until an item is available. */ -template <class T> class ConcurrentQueue { +template <class T> class ConcurrentQueue : public Waitable { public: - ConcurrentQueue() : waiters(0), shutdown(false) {} + struct ShutdownException {}; + + ConcurrentQueue() : shutdownFlag(false) {} - /** Threads in wait() are woken with ShutdownException before - * destroying the queue. - */ - ~ConcurrentQueue() { - Mutex::ScopedLock l(lock); - shutdown = true; - lock.notifyAll(); - while (waiters > 0) - lock.wait(); + /** Waiting threads are notified by ~Waitable */ + ~ConcurrentQueue() { shutdown(); } + + bool shutdown(bool wait=true) { + ScopedLock l(lock); + if (!shutdownFlag) { + shutdownFlag=true; + lock.notifyAll(); + if (wait) lock.waitAll(); + shutdownFlag=true; + return true; + } + return false; } - + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); queue.push_back(data); + lock.notify(); } /** If the queue is non-empty, pop the front item into data and * return true. If the queue is empty, return false */ - bool pop(T& data) { + bool tryPop(T& data) { Mutex::ScopedLock l(lock); - return popInternal(data); + if (shutdownFlag || queue.empty()) + return false; + data = queue.front(); + queue.pop_front(); + return true; } - /** Wait up to deadline for a data item to be available. - *@return true if data was available, false if timed out. + /** Wait up to a timeout for a data item to be available. + *@return true if data was available, false if timed out or shut down. *@throws ShutdownException if the queue is destroyed. */ - bool waitPop(T& data, Duration timeout) { - Mutex::ScopedLock l(lock); - ScopedIncrement<size_t> w( - waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + bool waitPop(T& data, Duration timeout=TIME_INFINITE) { + ScopedLock l(lock); AbsTime deadline(now(), timeout); - while (queue.empty() && lock.wait(deadline)) - ; - return popInternal(data); - } - - private: - - bool popInternal(T& data) { - if (shutdown) - throw ShutdownException(); + { + ScopedWait(*this); + while (!shutdownFlag && queue.empty()) + if (!lock.wait(deadline)) + return false; + } if (queue.empty()) return false; - else { - data = queue.front(); - queue.pop_front(); - return true; - } + data = queue.front(); + queue.pop_front(); + return true; } + + bool isShutdown() { ScopedLock l(lock); return shutdownFlag; } - void noWaiters() { - assert(waiters == 0); - if (shutdown) - lock.notify(); // Notify dtor thread. - } - - Monitor lock; + protected: + Waitable lock; + private: std::deque<T> queue; - size_t waiters; - bool shutdown; + bool shutdownFlag; }; }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h index ba9e89ba5f..8645ab2484 100644 --- a/cpp/src/qpid/sys/ScopedIncrement.h +++ b/cpp/src/qpid/sys/ScopedIncrement.h @@ -30,17 +30,17 @@ namespace sys { * Optionally call a function if the decremented counter value is 0. * Note the function must not throw, it is called in the destructor. */ -template <class T> +template <class T, class F=boost::function<void()> > class ScopedIncrement : boost::noncopyable { public: - ScopedIncrement(T& c, boost::function0<void> f=0) + ScopedIncrement(T& c, F f=0) : count(c), callback(f) { ++count; } ~ScopedIncrement() { if (--count == 0 && callback) callback(); } private: T& count; - boost::function0<void> callback; + F callback; }; diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h index 085d51d7e2..7bb3b07ae0 100644 --- a/cpp/src/qpid/sys/Serializer.h +++ b/cpp/src/qpid/sys/Serializer.h @@ -23,7 +23,7 @@ * */ - +#include "qpid/Exception.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" @@ -41,6 +41,8 @@ class SerializerBase : private boost::noncopyable, private Runnable { public: typedef boost::function<void()> VoidFn0; + struct ShutdownException : public Exception {}; + /** @see Serializer::Serializer */ SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()); diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h new file mode 100644 index 0000000000..5a92756f3a --- /dev/null +++ b/cpp/src/qpid/sys/StateMonitor.h @@ -0,0 +1,78 @@ +#ifndef QPID_SYS_STATEMONITOR_H +#define QPID_SYS_STATEMONITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Waitable.h" + +#include <bitset> + +namespace qpid { +namespace sys { + +/** + * A monitor with an enum state value. + * + *@param Enum: enum type to use for states. + *@param EnumMax: Highest enum value. + */ +template <class Enum, size_t MaxEnum> +class StateMonitor : public Waitable +{ + public: + struct Set : public std::bitset<MaxEnum + 1> { + Set() {} + Set(Enum s) { set(s); } + Set(Enum s, Enum t) { set(s).set(t); } + Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); } + Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); } + }; + + + StateMonitor(Enum initial) { state=initial; } + + /** @pre Caller holds a ScopedLock. */ + void set(Enum s) { state=s; notifyAll(); } + /** @pre Caller holds a ScopedLock. */ + StateMonitor& operator=(Enum s) { set(s); return *this; } + + /** @pre Caller holds a ScopedLock. */ + Enum get() const { return state; } + /** @pre Caller holds a ScopedLock. */ + operator Enum() const { return state; } + + /** @pre Caller holds a ScopedLock */ + void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); } + + private: + Enum state; +}; + +}} + + +#endif /*!QPID_SYS_STATEMONITOR_H*/ diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h new file mode 100644 index 0000000000..eb71a1d742 --- /dev/null +++ b/cpp/src/qpid/sys/Waitable.h @@ -0,0 +1,73 @@ +#ifndef QPID_SYS_WAITABLE_H +#define QPID_SYS_WAITABLE_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Monitor.h" + +#include <assert.h> + +namespace qpid { +namespace sys { + +/** + * A monitor that keeps track of waiting threads. + * Threads that use a WaitLock are counted as waiters, threads that + * use a normal ScopedLock are not considered waiters. + */ +class Waitable : public Monitor { + public: + Waitable() : waiters(0) {} + + /** Use this inside a scoped lock around the + * call to Monitor::wait to be counted as a waiter + */ + struct ScopedWait { + Waitable& w; + ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; } + ~ScopedWait() { --w.waiters; w.notifyAll(); } + }; + + /** Block till all waiters have finished waiting. + * The calling thread does not count as a waiter. + *@pre Must be called inside a ScopedLock but NOT a ScopedWait. + */ + bool waitAll(Duration timeout=TIME_INFINITE) { + AbsTime deadline(now(), timeout); + while (waiters > 0) { + if (!wait(deadline)) { + assert(timeout != TIME_INFINITE); + return false; + } + } + return true; + } + + private: + friend struct ScopedWait; + size_t waiters; +}; + +}} // namespace qpid::sys + + + +#endif /*!QPID_SYS_WAITABLE_H*/ diff --git a/cpp/src/qpid/sys/apr/APRBase.cpp b/cpp/src/qpid/sys/apr/APRBase.cpp index f527e0d0b2..724c489303 100644 --- a/cpp/src/qpid/sys/apr/APRBase.cpp +++ b/cpp/src/qpid/sys/apr/APRBase.cpp @@ -20,7 +20,6 @@ */ #include <iostream> #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" #include "APRBase.h" using namespace qpid::sys; diff --git a/cpp/src/qpid/sys/apr/APRBase.h b/cpp/src/qpid/sys/apr/APRBase.h index c6b1854fb1..7b5644a129 100644 --- a/cpp/src/qpid/sys/apr/APRBase.h +++ b/cpp/src/qpid/sys/apr/APRBase.h @@ -24,7 +24,6 @@ #include <string> #include <apr_thread_mutex.h> #include <apr_errno.h> -#include "qpid/QpidError.h" namespace qpid { namespace sys { @@ -64,11 +63,8 @@ namespace sys { // Inlined as it is called *a lot* void inline qpid::sys::check(apr_status_t status, const char* file, const int line){ if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw qpid::QpidError(APR_ERROR + ((int) status), msg, - qpid::SrcLine(file, line)); + char tmp[256]; + throw Exception(QPID_MSG(apr_strerror(status, tmp, size))) } } diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp index 2630337408..1552aa06b5 100644 --- a/cpp/src/qpid/sys/posix/Shlib.cpp +++ b/cpp/src/qpid/sys/posix/Shlib.cpp @@ -19,8 +19,7 @@ */ #include "qpid/sys/Shlib.h" - -#include <qpid/QpidError.h> +#include "qpid/Exception.h" #include <dlfcn.h> @@ -32,7 +31,7 @@ void Shlib::load(const char* name) { handle = ::dlopen(name, RTLD_NOW); const char* error = ::dlerror(); if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); } } @@ -42,7 +41,7 @@ void Shlib::unload() { ::dlclose(handle); const char* error = ::dlerror(); if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); } handle = 0; } @@ -53,7 +52,7 @@ void* Shlib::getSymbol(const char* name) { void* sym = ::dlsym(handle, name); const char* error = ::dlerror(); if (error) - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); return sym; } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index c7a83df581..f0cc8cd5a5 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -21,7 +21,6 @@ #include "qpid/sys/Socket.h" -#include "qpid/QpidError.h" #include "check.h" #include "PrivatePosix.h" @@ -60,8 +59,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const result = ::getpeername(fd, (::sockaddr*)&name, &namelen); } - if (result < 0) - throw QPID_POSIX_ERROR(errno); + QPID_POSIX_CHECK(result); char servName[NI_MAXSERV]; char dispName[NI_MAXHOST]; diff --git a/cpp/src/qpid/sys/posix/check.cpp b/cpp/src/qpid/sys/posix/check.cpp deleted file mode 100644 index 408679caa8..0000000000 --- a/cpp/src/qpid/sys/posix/check.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <cerrno> -#include "check.h" - -namespace qpid { -namespace sys { - -std::string -PosixError::getMessage(int errNo) -{ - char buf[512]; - return std::string(strerror_r(errNo, buf, sizeof(buf))); -} - -PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() - : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) -{ } - -}} diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index 7fa7b69d3b..f864bf8762 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -22,41 +22,13 @@ * */ -#include <cerrno> -#include <string> -#include "qpid/QpidError.h" - -namespace qpid { -namespace sys { - -/** - * Exception with message from errno. - */ -class PosixError : public qpid::QpidError -{ - public: - static std::string getMessage(int errNo); - - PosixError(int errNo, const qpid::SrcLine& location) throw(); - - ~PosixError() throw() {} - - int getErrNo() { return errNo; } +#include "qpid/Exception.h" - Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); } - - void throwSelf() const { throw *this; } - - private: - int errNo; -}; - -}} +#include <cerrno> -/** Create a PosixError for the current file/line and errno. */ -#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO)) << " " << ERRNO) -/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */ #define QPID_POSIX_CHECK(RESULT) \ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index 9a982508d1..454b9ca56d 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -56,7 +56,7 @@ class ChannelTestBase : public CppUnit::TestCase } }; - InProcessBrokerClient connection; // client::connection + local broker + qpid::InProcessBrokerClient connection; const std::string qname; const std::string data; Queue queue; diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 2495a06fa4..db2cd62b0a 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -18,15 +18,21 @@ * under the License. * */ -#include <list> #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "qpid/client/Dispatcher.h" #include "qpid/client/Session.h" #include "qpid/framing/TransferContent.h" +#include "qpid/framing/reply_exceptions.h" + +#include <boost/optional.hpp> + +#include <list> using namespace qpid::client; using namespace qpid::framing; +using namespace qpid; +using namespace boost; struct DummyListener : public MessageListener { @@ -60,58 +66,77 @@ class ClientSessionTest : public CppUnit::TestCase CPPUNIT_TEST(testQueueQuery); CPPUNIT_TEST(testTransfer); CPPUNIT_TEST(testDispatcher); + CPPUNIT_TEST(testResumeExpiredError); + CPPUNIT_TEST(testUseSuspendedError); CPPUNIT_TEST(testSuspendResume); - CPPUNIT_TEST(testSuspendResumeErrors); + CPPUNIT_TEST(testDisconnectResume); + CPPUNIT_TEST(testAutoDelete); CPPUNIT_TEST_SUITE_END(); - boost::shared_ptr<Connector> broker; - Connection connection; + shared_ptr<broker::Broker> broker; Session session; + // Defer construction & thread creation to setUp + boost::optional<InProcessConnection> c; + boost::optional<InProcessConnection> c2; public: - ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) + void setUp() { + broker = broker::Broker::create(); + c=boost::in_place<InProcessConnection>(broker); + c2=boost::in_place<InProcessConnection>(broker); + } + + void tearDown() { + c2.reset(); + c.reset(); + broker.reset(); + } + + void declareSubscribe(const std::string& q="my-queue", + const std::string& dest="my-dest") { - connection.open(""); - session = connection.newSession(); + // FIXME aconway 2007-10-18: autoDelete queues are destroyed on channel close, not session. + // Fix & make all test queues exclusive, autoDelete + session.queueDeclare_(queue=q); // FIXME aconway 2007-10-01: exclusive=true, autoDelete=true); + session.messageSubscribe_(queue=q, destination=dest, acquireMode=1); + session.messageFlow_(destination=dest, unit=0, value=0xFFFFFFFF);//messages + session.messageFlow_(destination=dest, unit=1, value=0xFFFFFFFF);//bytes } + bool queueExists(const std::string& q) { + TypedResult<QueueQueryResult> result = session.queueQuery_(q); + return result.get().getQueue() == q; + } + void testQueueQuery() { - std::string name("my-queue"); - std::string alternate("amq.fanout"); - session.queueDeclare((queue=name, alternateExchange=alternate, exclusive=true, autoDelete=true)); - TypedResult<QueueQueryResult> result = session.queueQuery(name); + session = c->newSession(); + session.queueDeclare_(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true); + TypedResult<QueueQueryResult> result = session.queueQuery_(std::string("my-queue")); CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive()); - CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); + CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"), + result.get().getAlternateExchange()); } void testTransfer() { - std::string queueName("my-queue"); - std::string dest("my-dest"); - std::string data("my message"); - session.queueDeclare_(queue=queueName, exclusive=true, autoDelete=true); - //subcribe to the queue with confirm_mode = 1: - session.messageSubscribe_(queue=queueName, destination=dest, acquireMode=1); - session.messageFlow((destination=dest, unit=0, value=1));//messages - session.messageFlow((destination=dest, unit=1, value=0xFFFFFFFF));//bytes - //publish a message: - TransferContent _content(data); - _content.getDeliveryProperties().setRoutingKey("my-queue"); - session.messageTransfer_(content=_content); + session = c->newSession(); + declareSubscribe(); + session.messageTransfer_(content=TransferContent("my-message", "my-queue")); //get & test the message: FrameSet::shared_ptr msg = session.get(); CPPUNIT_ASSERT(msg->isA<MessageTransferBody>()); - CPPUNIT_ASSERT_EQUAL(data, msg->getContent()); + CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent()); //confirm receipt: session.execution().completed(msg->getId(), true, true); } void testDispatcher() { - session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + session = c->newSession(); + declareSubscribe(); TransferContent msg1("One"); msg1.getDeliveryProperties().setRoutingKey("my-queue"); @@ -125,9 +150,6 @@ public: msg3.getDeliveryProperties().setRoutingKey("my-queue"); session.messageTransfer_(content=msg3); - session.messageSubscribe_(queue="my-queue", destination="my-dest", acquireMode=1); - session.messageFlow((destination="my-dest", unit=0, value=1));//messages - session.messageFlow((destination="my-dest", unit=1, value=0xFFFFFFFF));//bytes DummyListener listener(session, "my-dest", 3); listener.listen(); CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size()); @@ -140,29 +162,66 @@ public: } - void testSuspendResume() { - session = connection.newSession(60); + void testResumeExpiredError() { + session = c->newSession(0); + session.suspend(); // session has 0 timeout. + try { + c->resume(session); + CPPUNIT_FAIL("Expected InvalidArgumentException."); + } catch(const InvalidArgumentException&) {} + } + + void testUseSuspendedError() { + session = c->newSession(60); session.suspend(); try { session.exchangeQuery_(name="amq.fanout"); CPPUNIT_FAIL("Expected session suspended exception"); - } catch(...) {} - connection.resume(session); - session.exchangeQuery_(name="amq.fanout"); - // FIXME aconway 2007-09-25: build up session state and confirm - //it survives the resume + } catch(const CommandInvalidException&) {} } - void testSuspendResumeErrors() { - session.suspend(); // session has 0 timeout. - try { - session.exchangeQuery_(name="amq.fanout"); - CPPUNIT_FAIL("Expected suspended session exception"); - } catch(...) {} - try { - connection.resume(session); - CPPUNIT_FAIL("Expected no such session exception."); - } catch(...) {} + void testSuspendResume() { + session = c->newSession(60); + declareSubscribe(); + session.suspend(); + // Make sure we are still subscribed after resume. + c->resume(session); + session.messageTransfer_(content=TransferContent("my-message", "my-queue")); + FrameSet::shared_ptr msg = session.get(); + CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); + } + + void testDisconnectResume() { + session = c->newSession(60); + session.queueDeclare_(queue="before"); + CPPUNIT_ASSERT(queueExists("before")); + // Simulate lost frames. + c->discard(); + session.queueDeclare_(queue=string("after")); + c->disconnect(); // Simulate disconnect, resume on a new connection. + c2->resume(session); + CPPUNIT_ASSERT(queueExists("after")); + } + + void testAutoDelete() { + // Verify that autoDelete queues survive suspend/resume. + session = c->newSession(60); + session.queueDeclare_(queue="my-queue", exclusive=true, autoDelete=true); + CPPUNIT_ASSERT(queueExists("my-queue")); + session.suspend(); + c->resume(session); + CPPUNIT_ASSERT(queueExists("my-queue")); + + // Verify they survive disconnect/resume on new Connection + c->disconnect(); + c2->resume(session); + + try { + // FIXME aconway 2007-10-23: Negative test, need to + // fix auto-delete queues to clean up with session, not channel. + CPPUNIT_ASSERT(queueExists("my-queue")); + CPPUNIT_FAIL("Negative test passed unexpectedly"); + } catch(const ChannelException&) {} } }; diff --git a/cpp/src/tests/ConcurrentQueue.cpp b/cpp/src/tests/ConcurrentQueue.cpp index e1adcce0f9..39155b4ff2 100644 --- a/cpp/src/tests/ConcurrentQueue.cpp +++ b/cpp/src/tests/ConcurrentQueue.cpp @@ -61,7 +61,7 @@ template <class T> class DualVectorDualLockQueue { /** If the queue is non-empty, pop the front item into data and * return true. If the queue is empty, return false */ - bool pop(T& data) { + bool tryPop(T& data) { Mutex::ScopedLock l(popLock); if (popIter == popVec.end()) { popVec.clear(); @@ -109,7 +109,7 @@ void nspin(const Duration& delay) { struct NullQueue { NullQueue(int items=0) : npush(items), npop(items) {} void push(int) { --npush; } - bool pop(int& n) { + bool tryPop(int& n) { if (npop == 0) return false; else { @@ -144,7 +144,7 @@ struct Popper : public Runnable { void run() { for (int i=items; i > 0; i--) { int n; - if (queue.pop(n)) + if (queue.tryPop(n)) BOOST_REQUIRE_EQUAL(i,n); npause(); } diff --git a/cpp/src/tests/EventChannelTest.cpp b/cpp/src/tests/EventChannelTest.cpp index 3ba54def86..6d8d64e165 100644 --- a/cpp/src/tests/EventChannelTest.cpp +++ b/cpp/src/tests/EventChannelTest.cpp @@ -117,18 +117,18 @@ class EventChannelTest : public CppUnit::TestCase CPPUNIT_ASSERT(re.hasError()); try { re.throwIfError(); - CPPUNIT_FAIL("Expected QpidError."); + CPPUNIT_FAIL("Expected Exception."); } - catch (const qpid::QpidError&) { } + catch (const qpid::Exception&) { } // Bad file descriptor. Note in this case we fail // in postEvent and throw immediately. try { ReadEvent bad; ec->postEvent(bad); - CPPUNIT_FAIL("Expected QpidError."); + CPPUNIT_FAIL("Expected Exception."); } - catch (const qpid::QpidError&) { } + catch (const qpid::Exception&) { } } void testWrite() { diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 5ca4e6c216..9e82447ffa 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -18,8 +18,6 @@ * under the License. * */ -#include "InProcessBroker.h" -#include "qpid/QpidError.h" #include "qpid/client/Exchange.h" #include "qpid/client/Queue.h" #include "qpid/client/Connection.h" @@ -30,6 +28,7 @@ #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid_test_plugin.h" #include <boost/bind.hpp> @@ -200,18 +199,12 @@ class FramingTest : public CppUnit::TestCase try { Content content(REFERENCE, ""); CPPUNIT_ASSERT(false);//fail, expected exception - } catch (QpidError& e) { - CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); - CPPUNIT_ASSERT_EQUAL(string("Reference cannot be empty"), e.msg); - } + } catch (const InvalidArgumentException& e) {} try { Content content(2, "Blah"); CPPUNIT_ASSERT(false);//fail, expected exception - } catch (QpidError& e) { - CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); - CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); - } + } catch (const SyntaxErrorException& e) {} try { Buffer wbuff(buffer, sizeof(buffer)); @@ -221,11 +214,8 @@ class FramingTest : public CppUnit::TestCase Buffer rbuff(buffer, sizeof(buffer)); Content content; content.decode(rbuff); - CPPUNIT_ASSERT(false);//fail, expected exception - } catch (QpidError& e) { - CPPUNIT_ASSERT_EQUAL(FRAMING_ERROR, e.code); - CPPUNIT_ASSERT_EQUAL(string("Invalid discriminator: 2"), e.msg); - } + CPPUNIT_FAIL("Expected exception"); + } catch (Exception& e) {} } diff --git a/cpp/src/tests/HeadersExchangeTest.cpp b/cpp/src/tests/HeadersExchangeTest.cpp index c47266caa6..f07f238ee4 100644 --- a/cpp/src/tests/HeadersExchangeTest.cpp +++ b/cpp/src/tests/HeadersExchangeTest.cpp @@ -19,7 +19,7 @@ * */ -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" @@ -118,7 +118,7 @@ class HeadersExchangeTest : public CppUnit::TestCase try { //just checking this doesn't cause assertion etc exchange.bind(queue, key, &args); - } catch(qpid::QpidError&) { + } catch(qpid::Exception&) { //expected } } diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 2a9f12771b..c5860568db 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -25,6 +25,9 @@ #include "qpid/client/Connector.h" #include "qpid/client/Connection.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/ConcurrentQueue.h" +#include "qpid/shared_ptr.h" #include <vector> #include <iostream> @@ -32,112 +35,176 @@ namespace qpid { -namespace broker { + /** - * A broker that implements client::Connector allowing direct - * in-process connection of client to broker. Used to write round-trip - * tests without requiring an external broker process. - * + * A client::Connector that connects directly to an in-process broker. * Also allows you to "snoop" on frames exchanged between client & broker. * * see FramingTest::testRequestResponseRoundtrip() for example of use. */ -class InProcessBroker : public client::Connector { +class InProcessConnector : + public client::Connector +{ public: + typedef sys::Mutex Mutex; + typedef Mutex::ScopedLock Lock; + typedef framing::FrameHandler FrameHandler; + typedef framing::AMQFrame AMQFrame; + enum Sender {CLIENT,BROKER}; - /** A frame tagged with the sender */ - struct TaggedFrame { - TaggedFrame(Sender e, framing::AMQFrame& f) : frame(f), sender(e) {} - bool fromBroker() const { return sender == BROKER; } - bool fromClient() const { return sender == CLIENT; } + /** Simulate the network thread of a peer with a queue and a thread. + * With setInputHandler(0) drops frames simulating network packet loss. + */ + class NetworkQueue : public sys::Runnable + { + public: + NetworkQueue(const char* r) : inputHandler(0), receiver(r) { + thread=sys::Thread(this); + } - template <class MethodType> - MethodType* asMethod() { - return dynamic_cast<MethodType*>(frame.getBody()); + ~NetworkQueue() { + queue.shutdown(); + thread.join(); } - framing::AMQFrame frame; - Sender sender; + + void push(AMQFrame& f) { queue.push(f); } + + void run() { + AMQFrame f; + while (queue.waitPop(f)) { + Lock l(lock); + if (inputHandler) { + QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << f)); + inputHandler->handle(f); + } + else { + QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << f)); + } + } + } + + void setInputHandler(FrameHandler* h) { + Lock l(lock); + inputHandler = h; + } + + private: + sys::Mutex lock; + sys::ConcurrentQueue<AMQFrame> queue; + sys::Thread thread; + FrameHandler* inputHandler; + const char* const receiver; }; - - typedef std::vector<TaggedFrame> Conversation; - - InProcessBroker(framing::ProtocolVersion ver= - framing::highestProtocolVersion - ) : - Connector(ver), - protocolInit(ver), - broker(broker::Broker::create()), - brokerOut(BROKER, conversation), + + struct InProcessHandler : public sys::ConnectionOutputHandler { + Sender from; + NetworkQueue queue; + const char* const sender; + + InProcessHandler(Sender s) + : from(s), + queue(from==CLIENT? "BROKER" : "CLIENT"), + sender(from==BROKER? "BROKER" : "CLIENT") + {} + + ~InProcessHandler() { } + + void send(AMQFrame& f) { + QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f)); + queue.push(f); + } + + void close() { + // Do not shut down the queue here, we may be in + // the queue's dispatch thread. + } + }; + + InProcessConnector(shared_ptr<broker::Broker> b, + framing::ProtocolVersion v=framing::ProtocolVersion()) : + Connector(v), + protocolInit(v), + broker(b), + brokerOut(BROKER), brokerConnection(&brokerOut, *broker), - clientOut(CLIENT, conversation, &brokerConnection) - {} + clientOut(CLIENT), + isClosed(false) + { + clientOut.queue.setInputHandler(&brokerConnection); + } - ~InProcessBroker() { broker->shutdown(); } + ~InProcessConnector() { + close(); + + } void connect(const std::string& /*host*/, int /*port*/) {} + void init() { brokerConnection.initiated(protocolInit); } - void close() {} + + void close() { + if (!isClosed) { + isClosed = true; + brokerOut.close(); + clientOut.close(); + brokerConnection.closed(); + } + } /** Client's input handler. */ void setInputHandler(framing::InputHandler* handler) { - brokerOut.in = handler; + brokerOut.queue.setInputHandler(handler); } /** Called by client to send a frame */ void send(framing::AMQFrame& frame) { - clientOut.send(frame); + clientOut.handle(frame); } - /** Entire client-broker conversation is recorded here */ - Conversation conversation; + /** Sliently discard frames sent by either party, lost network traffic. */ + void discard() { + brokerOut.queue.setInputHandler(0); + clientOut.queue.setInputHandler(0); + } private: - /** OutputHandler that forwards data to an InputHandler */ - struct OutputToInputHandler : public sys::ConnectionOutputHandler { - OutputToInputHandler( - Sender sender_, Conversation& conversation_, - framing::InputHandler* ih=0 - ) : sender(sender_), conversation(conversation_), in(ih) {} - - void send(framing::AMQFrame& frame) { - QPID_LOG(debug, - (sender==CLIENT ? "CLIENT: " : "BROKER: ") << frame); - conversation.push_back(TaggedFrame(sender, frame)); - in->received(frame); - } - - void close() {} - - Sender sender; - Conversation& conversation; - framing::InputHandler* in; - }; - + sys::Mutex lock; framing::ProtocolInitiation protocolInit; - shared_ptr<Broker> broker; - OutputToInputHandler brokerOut; + shared_ptr<broker::Broker> broker; + InProcessHandler brokerOut; broker::Connection brokerConnection; - OutputToInputHandler clientOut; + InProcessHandler clientOut; + bool isClosed; }; -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::TaggedFrame& tf) -{ - return out << (tf.fromBroker()? "BROKER: ":"CLIENT: ") << tf.frame; -} - -std::ostream& operator<<( - std::ostream& out, const InProcessBroker::Conversation& conv) -{ - copy(conv.begin(), conv.end(), - std::ostream_iterator<InProcessBroker::TaggedFrame>(out, "\n")); - return out; -} - -} // namespace broker -} // namespace qpid +struct InProcessConnection : public client::Connection { + InProcessConnection(shared_ptr<broker::Broker> b) + : client::Connection( + shared_ptr<client::Connector>( + new InProcessConnector(b))) + { + open(""); + } + + ~InProcessConnection() { } + + /** Simulate disconnected network connection. */ + void disconnect() { impl->getConnector()->close(); } + + /** Sliently discard frames sent by either party, lost network traffic. */ + void discard() { + dynamic_pointer_cast<InProcessConnector>( + impl->getConnector())->discard(); + } +}; +/** A connector with its own broker */ +struct InProcessBroker : public InProcessConnector { + InProcessBroker() : InProcessConnector(broker::Broker::create()) {} +}; + +} // namespace qpid #endif // _tests_InProcessBroker_h diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 233614367d..b954ae88f4 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -20,16 +20,10 @@ CLEANFILES= # Unit test programs. # -# FIXME aconway 2007-08-29: enable when session is reinstated. -# TESTS+=Session -# check_PROGRAMS+=Session -# Session_SOURCES=Session.cpp -# Session_LDADD=-lboost_unit_test_framework $(lib_broker) - -TESTS+=ResumeHandler -check_PROGRAMS+=ResumeHandler -ResumeHandler_SOURCES=ResumeHandler.cpp -ResumeHandler_LDADD=-lboost_unit_test_framework $(lib_common) +TESTS+=SessionState +check_PROGRAMS+=SessionState +SessionState_SOURCES=SessionState.cpp +SessionState_LDADD=-lboost_unit_test_framework $(lib_common) TESTS+=Blob check_PROGRAMS+=Blob diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 114e0045f5..3235fe2418 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -54,7 +54,7 @@ class FailOnDeliver : public Deliverable public: void deliverTo(Queue::shared_ptr& queue) { - throw Exception(boost::format("Invalid delivery to %1%") % queue->getName()); + throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } }; diff --git a/cpp/src/tests/ResumeHandler.cpp b/cpp/src/tests/ResumeHandler.cpp deleted file mode 100644 index 1073e42a3c..0000000000 --- a/cpp/src/tests/ResumeHandler.cpp +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/ResumeHandler.h" - -#define BOOST_AUTO_TEST_MAIN -#include <boost/test/auto_unit_test.hpp> - -#include <vector> - -using namespace std; -using namespace qpid::framing; - -AMQFrame& frame(const char* s) { - static AMQFrame frame; - frame.setBody(AMQContentBody(s)); - return frame; -} - -struct Collector : public FrameHandler, public vector<AMQFrame> { - void handle(AMQFrame& f) { push_back(f); } -}; - - -namespace qpid { -namespace framing { - -bool operator==(const AMQFrame& a, const AMQFrame& b) { - const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody()); - const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody()); - return ab && bb && ab->getData() == bb->getData(); -} - -}} // namespace qpid::framing - - -BOOST_AUTO_TEST_CASE(testSend) { - AMQFrame f; - ResumeHandler sender; - Collector collect; - sender.out.next = &collect; - sender.out(frame("a")); - BOOST_CHECK_EQUAL(1u, collect.size()); - BOOST_CHECK_EQUAL(frame("a"), collect[0]); - sender.out(frame("b")); - sender.out(frame("c")); - sender.ackReceived(1); // ack a,b. - sender.out(frame("d")); - BOOST_CHECK_EQUAL(4u, collect.size()); - BOOST_CHECK_EQUAL(frame("d"), collect.back()); - // Now try a resend. - collect.clear(); - sender.resend(); - BOOST_REQUIRE_EQUAL(collect.size(), 2u); - BOOST_CHECK_EQUAL(frame("c"), collect[0]); - BOOST_CHECK_EQUAL(frame("d"), collect[1]); -} - - -BOOST_AUTO_TEST_CASE(testReceive) { - ResumeHandler receiver; - Collector collect; - receiver.in.next = &collect; - receiver.in(frame("a")); - receiver.in(frame("b")); - BOOST_CHECK_EQUAL(receiver.getLastReceived().getValue(), 1u); - receiver.in(frame("c")); - BOOST_CHECK_EQUAL(receiver.getLastReceived().getValue(), 2u); - BOOST_CHECK_EQUAL(3u, collect.size()); - BOOST_CHECK_EQUAL(frame("a"), collect[0]); - BOOST_CHECK_EQUAL(frame("c"), collect[2]); -} diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp new file mode 100644 index 0000000000..c8d912801e --- /dev/null +++ b/cpp/src/tests/SessionState.cpp @@ -0,0 +1,142 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/SessionState.h" + +#define BOOST_AUTO_TEST_MAIN +#include <boost/test/auto_unit_test.hpp> +#include <boost/bind.hpp> + +using namespace std; +using namespace qpid::framing; +using namespace boost; + +// Create a frame with a one-char string. +AMQFrame& frame(char s) { + static AMQFrame frame; + frame.setBody(AMQContentBody(string(&s, 1))); + return frame; +} + +// Extract the one-char string from a frame. +char charFromFrame(const AMQFrame& f) { + const AMQContentBody* b=dynamic_cast<const AMQContentBody*>(f.getBody()); + BOOST_REQUIRE(b && b->getData().size() > 0); + return b->getData()[0]; +} + +// Sent chars as frames +void sent(SessionState& session, const std::string& frames) { + for_each(frames.begin(), frames.end(), + bind(&SessionState::sent, ref(session), bind(frame, _1))); +} + +// Received chars as frames +void received(SessionState& session, const std::string& frames) { + for_each(frames.begin(), frames.end(), + bind(&SessionState::received, session, bind(frame, _1))); +} + +// Make a string from a ReplayRange. +std::string replayChars(const SessionState::Replay& frames) { + string result(frames.size(), ' '); + transform(frames.begin(), frames.end(), result.begin(), + bind(&charFromFrame, _1)); + return result; +} + +namespace qpid { +namespace framing { + +bool operator==(const AMQFrame& a, const AMQFrame& b) { + const AMQContentBody* ab=dynamic_cast<const AMQContentBody*>(a.getBody()); + const AMQContentBody* bb=dynamic_cast<const AMQContentBody*>(b.getBody()); + return ab && bb && ab->getData() == bb->getData(); +} + +}} // namespace qpid::framing + + +BOOST_AUTO_TEST_CASE(testSent) { + // Test that we send solicit-ack at the right interval. + AMQContentBody f; + SessionState s1(1); + BOOST_CHECK(s1.sent(f)); + BOOST_CHECK(s1.sent(f)); + BOOST_CHECK(s1.sent(f)); + + SessionState s3(3); + BOOST_CHECK(!s3.sent(f)); + BOOST_CHECK(!s3.sent(f)); + BOOST_CHECK(s3.sent(f)); + + BOOST_CHECK(!s3.sent(f)); + BOOST_CHECK(!s3.sent(f)); + s3.receivedAck(4); + BOOST_CHECK(!s3.sent(f)); + BOOST_CHECK(!s3.sent(f)); + BOOST_CHECK(s3.sent(f)); +} + +BOOST_AUTO_TEST_CASE(testReplay) { + // Replay of all frames. + SessionState session(100); + sent(session, "abc"); + session.suspend(); session.resuming(); + session.receivedAck(-1); + BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc"); + + // Replay with acks + session.receivedAck(0); // ack a. + session.suspend(); + session.resuming(); + session.receivedAck(1); // ack b. + BOOST_CHECK_EQUAL(replayChars(session.replay()), "c"); + + // Replay after further frames. + sent(session, "def"); + session.suspend(); + session.resuming(); + session.receivedAck(3); + BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef"); + + // Bad ack, too high + try { + session.receivedAck(6); + BOOST_FAIL("expected exception"); + } catch(const qpid::Exception&) {} + +} + +BOOST_AUTO_TEST_CASE(testReceived) { + // Check that we request acks at the right interval. + AMQContentBody f; + SessionState s1(1); + BOOST_CHECK_EQUAL(0u, *s1.received(f)); + BOOST_CHECK_EQUAL(1u, *s1.received(f)); + BOOST_CHECK_EQUAL(2u, *s1.received(f)); + + SessionState s3(3); + BOOST_CHECK(!s3.received(f)); + BOOST_CHECK(!s3.received(f)); + BOOST_CHECK_EQUAL(2u, *s3.received(f)); + + BOOST_CHECK(!s3.received(f)); + BOOST_CHECK(!s3.received(f)); + BOOST_CHECK_EQUAL(5u, *s3.received(f)); +} diff --git a/cpp/src/tests/Shlib.cpp b/cpp/src/tests/Shlib.cpp index 87136425ab..6420af915e 100644 --- a/cpp/src/tests/Shlib.cpp +++ b/cpp/src/tests/Shlib.cpp @@ -20,6 +20,7 @@ #include "test_tools.h" #include "qpid/sys/Shlib.h" +#include "qpid/Exception.h" #define BOOST_AUTO_TEST_MAIN // Must come before #include<boost/test/*> #include <boost/test/auto_unit_test.hpp> @@ -40,7 +41,7 @@ BOOST_AUTO_TEST_CASE(testShlib) { sh.getSymbol("callMe"); BOOST_FAIL("Expected exception"); } - catch (...) {} + catch (const qpid::Exception&) {} } BOOST_AUTO_TEST_CASE(testAutoShlib) { diff --git a/cpp/src/tests/TimerTest.cpp b/cpp/src/tests/TimerTest.cpp index 682699dbd3..3f2a1c57ec 100644 --- a/cpp/src/tests/TimerTest.cpp +++ b/cpp/src/tests/TimerTest.cpp @@ -26,6 +26,7 @@ #include <iostream> #include <memory> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> using namespace qpid::broker; using namespace qpid::sys; diff --git a/cpp/src/tests/TxMocks.h b/cpp/src/tests/TxMocks.h index e4e74ee535..127a27c005 100644 --- a/cpp/src/tests/TxMocks.h +++ b/cpp/src/tests/TxMocks.h @@ -25,7 +25,6 @@ #include "qpid/Exception.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/broker/TxOp.h" -#include <boost/format.hpp> #include <iostream> #include <vector> @@ -40,9 +39,9 @@ template <class T> void assertEqualVector(std::vector<T>& expected, std::vector< i++; } if (i < expected.size()) { - throw qpid::Exception(boost::format("Missing %1%") % expected[i]); + throw qpid::Exception(QPID_MSG("Missing " << expected[i])); } else if (i < actual.size()) { - throw qpid::Exception(boost::format("Extra %1%") % actual[i]); + throw qpid::Exception(QPID_MSG("Extra " << actual[i])); } CPPUNIT_ASSERT_EQUAL(expected.size(), actual.size()); } diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 8cf43ce069..e4fd57824c 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -29,7 +29,6 @@ #include <iostream> #include "TestOptions.h" -#include "qpid/QpidError.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" diff --git a/cpp/src/tests/echo_service.cpp b/cpp/src/tests/echo_service.cpp index 7989ec8543..c3569d5fd4 100644 --- a/cpp/src/tests/echo_service.cpp +++ b/cpp/src/tests/echo_service.cpp @@ -27,7 +27,6 @@ * sender-specified private queue. */ -#include "qpid/QpidError.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index 3feef7e876..3783ae6901 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -22,7 +22,6 @@ #include <iostream> #include "TestOptions.h" -#include "qpid/QpidError.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp index 5bfe88662a..56f9cbf3d2 100644 --- a/cpp/src/tests/interop_runner.cpp +++ b/cpp/src/tests/interop_runner.cpp @@ -21,7 +21,6 @@ #include "qpid/Options.h" #include "qpid/Exception.h" -#include "qpid/QpidError.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" diff --git a/cpp/src/tests/logging.cpp b/cpp/src/tests/logging.cpp index f5402aaad7..1042e60077 100644 --- a/cpp/src/tests/logging.cpp +++ b/cpp/src/tests/logging.cpp @@ -71,22 +71,18 @@ BOOST_AUTO_TEST_CASE(testSelector_enable) { BOOST_CHECK(s.isEnabled(critical, "oops")); } -Logger& clearLogger() { - Logger::instance().clear(); - return Logger::instance(); -} - BOOST_AUTO_TEST_CASE(testStatementEnabled) { - // Verify that the logger enables and disables log statements. - Logger& l=clearLogger(); + // Verify that the singleton enables and disables static + // log statements. + Logger& l = Logger::instance(); l.select(Selector(debug)); - Statement s=QPID_LOG_STATEMENT_INIT(debug); + static Statement s=QPID_LOG_STATEMENT_INIT(debug); BOOST_CHECK(!s.enabled); - Statement::Initializer init(s); + static Statement::Initializer init(s); BOOST_CHECK(s.enabled); - Statement s2=QPID_LOG_STATEMENT_INIT(warning); - Statement::Initializer init2(s2); + static Statement s2=QPID_LOG_STATEMENT_INIT(warning); + static Statement::Initializer init2(s2); BOOST_CHECK(!s2.enabled); l.select(Selector(warning)); @@ -98,9 +94,10 @@ struct TestOutput : public Logger::Output { vector<string> msg; vector<Statement> stmt; - TestOutput() { - Logger::instance().output(qpid::make_auto_ptr<Logger::Output>(this)); + TestOutput(Logger& l) { + l.output(std::auto_ptr<Logger::Output>(this)); } + void log(const Statement& s, const string& m) { msg.push_back(m); stmt.push_back(s); @@ -111,10 +108,12 @@ struct TestOutput : public Logger::Output { using boost::assign::list_of; BOOST_AUTO_TEST_CASE(testLoggerOutput) { - Logger& l=clearLogger(); + Logger l; + l.clear(); l.select(Selector(debug)); Statement s=QPID_LOG_STATEMENT_INIT(debug); - TestOutput* out=new TestOutput(); + + TestOutput* out=new TestOutput(l); // Verify message is output. l.log(s, "foo"); @@ -122,7 +121,7 @@ BOOST_AUTO_TEST_CASE(testLoggerOutput) { BOOST_CHECK_EQUAL(expect, out->msg); // Verify multiple outputs - TestOutput* out2=new TestOutput(); + TestOutput* out2=new TestOutput(l); l.log(Statement(), "baz"); expect.push_back("baz\n"); BOOST_CHECK_EQUAL(expect, out->msg); @@ -131,9 +130,10 @@ BOOST_AUTO_TEST_CASE(testLoggerOutput) { } BOOST_AUTO_TEST_CASE(testMacro) { - Logger& l = clearLogger(); + Logger& l=Logger::instance(); + l.clear(); l.select(Selector(info)); - TestOutput* out=new TestOutput(); + TestOutput* out=new TestOutput(l); QPID_LOG(info, "foo"); vector<string> expect=list_of("foo\n"); BOOST_CHECK_EQUAL(expect, out->msg); @@ -150,9 +150,9 @@ BOOST_AUTO_TEST_CASE(testMacro) { } BOOST_AUTO_TEST_CASE(testLoggerFormat) { - Logger& l=clearLogger(); + Logger& l = Logger::instance(); l.select(Selector(critical)); - TestOutput* out=new TestOutput(); + TestOutput* out=new TestOutput(l); // Time format is YYY-Month-dd hh:mm:ss l.format(Logger::TIME); @@ -183,7 +183,8 @@ BOOST_AUTO_TEST_CASE(testLoggerFormat) { } BOOST_AUTO_TEST_CASE(testOstreamOutput) { - Logger& l=clearLogger(); + Logger& l=Logger::instance(); + l.clear(); l.select(Selector(error)); ostringstream os; l.output(os); @@ -191,12 +192,12 @@ BOOST_AUTO_TEST_CASE(testOstreamOutput) { QPID_LOG(error, "bar"); QPID_LOG(error, "baz"); BOOST_CHECK_EQUAL("foo\nbar\nbaz\n", os.str()); - l.clear(); } #if 0 // This test requires manual intervention. Normally disabled. BOOST_AUTO_TEST_CASE(testSyslogOutput) { - Logger& l = clearLogger(); + Logger& l=Logger::instance(); + l.clear(); l.select(Selector(info)); l.syslog("qpid_test"); QPID_LOG(info, "Testing QPID"); @@ -312,7 +313,7 @@ BOOST_AUTO_TEST_CASE(testSelectorFromOptions) { } BOOST_AUTO_TEST_CASE(testOptionsFormat) { - Logger& l = clearLogger(); + Logger l; { Options opts; BOOST_CHECK_EQUAL(Logger::TIME|Logger::LEVEL, l.format(opts)); @@ -344,7 +345,8 @@ BOOST_AUTO_TEST_CASE(testOptionsFormat) { } BOOST_AUTO_TEST_CASE(testLoggerConfigure) { - Logger& l = clearLogger(); + Logger& l=Logger::instance(); + l.clear(); Options opts; char* argv[]={ 0, diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index d16ebd43de..bc816f6597 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -27,7 +27,6 @@ #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Message.h" -#include "qpid/QpidError.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" diff --git a/cpp/src/tests/run-unit-tests b/cpp/src/tests/run-unit-tests index ce8f488b29..464ce131f5 100755 --- a/cpp/src/tests/run-unit-tests +++ b/cpp/src/tests/run-unit-tests @@ -24,5 +24,5 @@ test -z "$TEST_ARGS" && TEST_ARGS=".libs/*Test.so" test -z "$srcdir" && srcdir=. # libdlclose_noop prevents unloading symbols needed for valgrind output. -LD_PRELOAD=.libs/libdlclose_noop.so exec $srcdir/test_env DllPlugInTester -c -b $TEST_ARGS - +export LD_PRELOAD=.libs/libdlclose_noop.so +source $srcdir/run_test DllPlugInTester -c -b $TEST_ARGS diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 9369b591a6..5aef16354e 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -32,7 +32,6 @@ * listening). */ -#include "qpid/QpidError.h" #include "TestOptions.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 74fcf8b057..1c5b51309b 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -35,7 +35,6 @@ */ #include "TestOptions.h" -#include "qpid/QpidError.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" |