diff options
Diffstat (limited to 'qpid/cpp')
-rwxr-xr-x | qpid/cpp/rubygen/framing.0-10/constants.rb | 8 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/Exception.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Bounds.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Bounds.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 33 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/ExceptionHolder.h (renamed from qpid/cpp/src/qpid/ExceptionHolder.h) | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Waitable.h | 55 |
12 files changed, 123 insertions, 92 deletions
diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb index df74ac7459..7f026a3e54 100755 --- a/qpid/cpp/rubygen/framing.0-10/constants.rb +++ b/qpid/cpp/rubygen/framing.0-10/constants.rb @@ -62,14 +62,14 @@ class ConstantsGen < CppGen def reply_exceptions_h() h_file("#{@dir}/reply_exceptions") { include "qpid/Exception" - include "qpid/ExceptionHolder" + include "qpid/sys/ExceptionHolder" namespace(@namespace) { define_exceptions_for("execution", "error-code", "SessionException") define_exceptions_for("connection", "close-code", "ConnectionException") define_exceptions_for("session", "detach-code", "ChannelException") genl genl "void throwExecutionException(int code, const std::string& text);" - genl "void setExecutionException(ExceptionHolder& holder, int code, const std::string& text);" + genl "void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text);" } } end @@ -81,11 +81,11 @@ class ConstantsGen < CppGen include "<assert.h>" namespace("qpid::framing") { scope("void throwExecutionException(int code, const std::string& text) {"){ - genl "ExceptionHolder h;" + genl "sys::ExceptionHolder h;" genl "setExecutionException(h, code, text);" genl "h.raise();" } - scope("void setExecutionException(ExceptionHolder& holder, int code, const std::string& text) {"){ + scope("void setExecutionException(sys::ExceptionHolder& holder, int code, const std::string& text) {"){ scope("switch (code) {") { enum = @amqp.class_("execution").domain("error-code").enum enum.choices.each { |c| diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index e0fd44f0c7..401488345e 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -344,7 +344,7 @@ nobase_include_HEADERS = \ qpid/assert.h \ qpid/DataDir.h \ qpid/Exception.h \ - qpid/ExceptionHolder.h \ + qpid/sys/ExceptionHolder.h \ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index 8176d92cac..28c9d3742b 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -34,18 +34,22 @@ std::string strError(int err) { } Exception::Exception(const std::string& msg) throw() : message(msg) { - QPID_LOG(debug, "Exception: " << message); + QPID_LOG(debug, "Exception constructed: " << message); } Exception::~Exception() throw() {} -std::string Exception::getPrefix() const { return "Exception"; } +std::string Exception::getPrefix() const { return ""; } std::string Exception::getMessage() const { return message; } const char* Exception::what() const throw() { - if (whatStr.empty()) - whatStr = getPrefix() + ": " + message; + // Construct the what string the first time it is needed. + if (whatStr.empty()) { + whatStr = getPrefix(); + if (!whatStr.empty()) whatStr += ": "; + whatStr += message; + } return whatStr.c_str(); } diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 19e031dbe0..b6f6b9cee9 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -197,7 +197,7 @@ bool Connection::doOutput() //then do other output as needed: return outputTasks.doOutput(); }catch(ConnectionException& e){ - close(e.code, e.what(), 0, 0); + close(e.code, e.getMessage(), 0, 0); }catch(std::exception& e){ close(541/*internal error*/, e.what(), 0, 0); } diff --git a/qpid/cpp/src/qpid/client/Bounds.cpp b/qpid/cpp/src/qpid/client/Bounds.cpp index 1df21db941..aac18022bc 100644 --- a/qpid/cpp/src/qpid/client/Bounds.cpp +++ b/qpid/cpp/src/qpid/client/Bounds.cpp @@ -1,49 +1,40 @@ #include "Bounds.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Waitable.h" namespace qpid { namespace client { -using sys::Monitor; +using sys::Waitable; Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} -bool Bounds::expand(size_t sizeRequired, bool block) -{ - if (max) { - Monitor::ScopedLock l(lock); - current += sizeRequired; - if (block) { - while (current > max) { - QPID_LOG(debug, "Waiting for bounds: " << *this); - lock.wait(); - } - QPID_LOG(debug, "Bounds ok: " << *this); - } - return current <= max; - } else { - return true; +bool Bounds::expand(size_t sizeRequired, bool block) { + if (!max) return true; + Waitable::ScopedLock l(lock); + current += sizeRequired; + if (block) { + Waitable::ScopedWait w(lock); + while (current > max) + lock.wait(); } + return current <= max; } -void Bounds::reduce(size_t size) -{ +void Bounds::reduce(size_t size) { if (!max || size == 0) return; - Monitor::ScopedLock l(lock); + Waitable::ScopedLock l(lock); if (current == 0) return; - bool needNotify = current > max; current -= std::min(size, current); - if (needNotify && current < max) { - //todo: notify one at a time, but ensure that all threads are - //eventually notified - lock.notifyAll(); + if (current < max && lock.hasWaiters()) { + assert(lock.hasWaiters() == 1); + lock.notify(); } } -size_t Bounds::getCurrentSize() -{ - Monitor::ScopedLock l(lock); +size_t Bounds::getCurrentSize() { + Waitable::ScopedLock l(lock); return current; } @@ -52,4 +43,10 @@ std::ostream& operator<<(std::ostream& out, const Bounds& bounds) { return out; } +void Bounds::setException(const sys::ExceptionHolder& e) { + Waitable::ScopedLock l(lock); + lock.setException(e); + lock.waitWaiters(); // Wait for waiting threads to exit. +} + }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Bounds.h b/qpid/cpp/src/qpid/client/Bounds.h index db18becce3..838fcb8368 100644 --- a/qpid/cpp/src/qpid/client/Bounds.h +++ b/qpid/cpp/src/qpid/client/Bounds.h @@ -20,7 +20,7 @@ * under the License. * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" namespace qpid{ namespace client{ @@ -32,10 +32,11 @@ class Bounds bool expand(size_t, bool block); void reduce(size_t); size_t getCurrentSize(); - + void setException(const sys::ExceptionHolder&); + private: friend std::ostream& operator<<(std::ostream&, const Bounds&); - sys::Monitor lock; + sys::Waitable lock; const size_t max; size_t current; }; diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index df1afb87a9..05f6bb9733 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -83,11 +83,10 @@ void ConnectionHandler::incoming(AMQFrame& frame) void ConnectionHandler::outgoing(AMQFrame& frame) { - if (getState() == OPEN) { + if (getState() == OPEN) out(frame); - } else { - throw Exception("Connection is not open."); - } + else + throw Exception(errorText.empty() ? "Connection is not open." : errorText); } void ConnectionHandler::waitForOpen() diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 81eda0bffb..22f10d3620 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -119,41 +119,32 @@ void ConnectionImpl::close() closed(NORMAL, "Closed by client"); } -// Set closed flags and erase the sessions map, but keep the contents -// so sessions can be updated outside the lock. -ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + +template <class F> void ConnectionImpl::closeInternal(const F& f) { isClosed = true; connector.close(); - SessionVector save; - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); - if (s) save.push_back(s); + if (s) f(s); } sessions.clear(); - return save; } -void ConnectionImpl::closed(uint16_t code, const std::string& text) -{ - SessionVector save; - { - Mutex::ScopedLock l(lock); - save = closeInternal(l); - } - std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text)); +void ConnectionImpl::closed(uint16_t code, const std::string& text) { + Mutex::ScopedLock l(lock); + setException(new ConnectionException(code, text)); + closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } static const std::string CONN_CLOSED("Connection closed by broker"); -void ConnectionImpl::shutdown() -{ +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); + // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here. + setException(new ConnectionException(CONNECTION_FORCED, CONN_CLOSED)); if (isClosed) return; - SessionVector save(closeInternal(l)); handler.fail(CONN_CLOSED); - Mutex::ScopedUnlock u(lock); - std::for_each(save.begin(), save.end(), - boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h index 655bca359b..089e73335d 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h @@ -49,7 +49,6 @@ class ConnectionImpl : public Bounds, { typedef std::map<uint16_t, boost::weak_ptr<SessionImpl> > SessionMap; - typedef std::vector<boost::shared_ptr<SessionImpl> > SessionVector; SessionMap sessions; ConnectionHandler handler; @@ -59,9 +58,8 @@ class ConnectionImpl : public Bounds, bool isClosed; bool isClosing; - template <class F> void detachAll(const F&); + template <class F> void closeInternal(const F&); - SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 66e1b9e40f..7b8cae943f 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -69,12 +69,14 @@ SessionImpl::SessionImpl(const std::string& name, } SessionImpl::~SessionImpl() { - Lock l(state); - if (state != DETACHED) { - QPID_LOG(warning, "Detaching deleted session"); - setState(DETACHED); - handleClosed(); - state.waitWaiters(); + { + Lock l(state); + if (state != DETACHED) { + QPID_LOG(warning, "Detaching deleted session"); + setState(DETACHED); + handleClosed(); + state.waitWaiters(); + } } connection->erase(channel); } diff --git a/qpid/cpp/src/qpid/ExceptionHolder.h b/qpid/cpp/src/qpid/sys/ExceptionHolder.h index fed6308f19..cfb971411e 100644 --- a/qpid/cpp/src/qpid/ExceptionHolder.h +++ b/qpid/cpp/src/qpid/sys/ExceptionHolder.h @@ -23,9 +23,11 @@ */ #include "qpid/memory.h" -#include <memory> +#include <boost/shared_ptr.hpp> + namespace qpid { +namespace sys { struct Raisable { virtual ~Raisable() {}; @@ -40,14 +42,14 @@ struct Raisable { class ExceptionHolder : public Raisable { public: ExceptionHolder() {} - ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {} + // Use default copy & assign. + /** Take ownership of ex */ template <class Ex> ExceptionHolder(Ex* ex) { wrap(ex); } - template <class Ex> ExceptionHolder(const std::auto_ptr<Ex>& ex) { wrap(ex.release()); } + template <class Ex> ExceptionHolder(const boost::shared_ptr<Ex>& ex) { wrap(ex.release()); } - ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; } template <class Ex> ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; } - template <class Ex> ExceptionHolder& operator=(std::auto_ptr<Ex> ex) { wrap(ex.release()); return *this; } + template <class Ex> ExceptionHolder& operator=(boost::shared_ptr<Ex> ex) { wrap(ex.release()); return *this; } void raise() const { if (wrapper.get()) wrapper->raise() ; } std::string what() const { return wrapper->what(); } @@ -60,14 +62,14 @@ class ExceptionHolder : public Raisable { Wrapper(Ex* ptr) : exception(ptr) {} void raise() const { throw *exception; } std::string what() const { return exception->what(); } - std::auto_ptr<Ex> exception; + boost::shared_ptr<Ex> exception; }; template <class Ex> void wrap(Ex* ex) { wrapper.reset(new Wrapper<Ex>(ex)); } - std::auto_ptr<Raisable> wrapper; - + boost::shared_ptr<Raisable> wrapper; }; -} // namespace qpid +}} // namespace qpid::sys + #endif /*!QPID_EXCEPTIONHOLDER_H*/ diff --git a/qpid/cpp/src/qpid/sys/Waitable.h b/qpid/cpp/src/qpid/sys/Waitable.h index 37392ed761..61b7e7d82b 100644 --- a/qpid/cpp/src/qpid/sys/Waitable.h +++ b/qpid/cpp/src/qpid/sys/Waitable.h @@ -21,8 +21,8 @@ * */ -#include "Monitor.h" - +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ExceptionHolder.h" #include <assert.h> namespace qpid { @@ -31,14 +31,18 @@ namespace sys { /** * A monitor that keeps track of waiting threads. Threads declare a * ScopedWait around wait() inside a ScopedLock to be considered - * waiters. + * waiters. + * + * Allows waiting threads to be interrupted by an exception. */ class Waitable : public Monitor { public: Waitable() : waiters(0) {} + ~Waitable() { assert(waiters == 0); } + /** Use this inside a scoped lock around the - * call to Monitor::wait to be counted as a waiter + * call to wait() to be counted as a waiter. */ struct ScopedWait { Waitable& w; @@ -46,22 +50,55 @@ class Waitable : public Monitor { ~ScopedWait() { if (--w.waiters==0) w.notifyAll(); } }; - /** Block till there are no more ScopedWaits. + /** Block till there are no more waiters in ScopedWaits. + * waitWaiters() does not raise an exception even if waiters + * were interrupted by one. *@pre Must be called inside a ScopedLock but NOT a ScopedWait. */ void waitWaiters() { while (waiters != 0) - wait(); + Monitor::wait(); } /** Returns the number of outstanding ScopedWaits. * Must be called with the lock held. */ - size_t hasWaiters() { return waiters; } - + size_t hasWaiters() const { + return waiters; + } + + /** Set an execption to interrupt waiters in ScopedWait. + * Must be called with the lock held. + */ + void setException(const ExceptionHolder& e) { + exception = e; + notifyAll(); + + } + + /** Throws an exception if one is set before or during the wait. */ + void wait() { + ExCheck e(exception); + Monitor::wait(); + } + + /** Throws an exception if one is set before or during the wait. */ + bool wait(const AbsTime& absoluteTime) { + ExCheck e(exception); + return Monitor::wait(absoluteTime); + } + + ExceptionHolder exception; + private: - friend struct ScopedWait; + struct ExCheck { + const ExceptionHolder& exception; + ExCheck(const ExceptionHolder& e) : exception(e) { e.raise(); } + ~ExCheck() { exception.raise(); } + }; + size_t waiters; + friend struct ScopedWait; }; }} // namespace qpid::sys |