From 41dce45772f586e18444267e2246b097d04ef28d Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 6 Jun 2008 20:23:28 +0000 Subject: Added exceptions to sys::Waitable. Fixed client side deadlock involving client::Bounds. Fixed incorrect exception messages during connection shutdown. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@664114 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/Makefile.am | 2 +- qpid/cpp/src/qpid/Exception.cpp | 12 +++-- qpid/cpp/src/qpid/ExceptionHolder.h | 73 ------------------------- qpid/cpp/src/qpid/broker/Connection.cpp | 2 +- qpid/cpp/src/qpid/client/Bounds.cpp | 51 +++++++++--------- qpid/cpp/src/qpid/client/Bounds.h | 7 +-- qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 7 ++- qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 33 +++++------- qpid/cpp/src/qpid/client/ConnectionImpl.h | 4 +- qpid/cpp/src/qpid/client/SessionImpl.cpp | 14 ++--- qpid/cpp/src/qpid/sys/ExceptionHolder.h | 75 ++++++++++++++++++++++++++ qpid/cpp/src/qpid/sys/Waitable.h | 55 +++++++++++++++---- 12 files changed, 183 insertions(+), 152 deletions(-) delete mode 100644 qpid/cpp/src/qpid/ExceptionHolder.h create mode 100644 qpid/cpp/src/qpid/sys/ExceptionHolder.h (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index e0fd44f0c7..401488345e 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -344,7 +344,7 @@ nobase_include_HEADERS = \ qpid/assert.h \ qpid/DataDir.h \ qpid/Exception.h \ - qpid/ExceptionHolder.h \ + qpid/sys/ExceptionHolder.h \ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index 8176d92cac..28c9d3742b 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -34,18 +34,22 @@ std::string strError(int err) { } Exception::Exception(const std::string& msg) throw() : message(msg) { - QPID_LOG(debug, "Exception: " << message); + QPID_LOG(debug, "Exception constructed: " << message); } Exception::~Exception() throw() {} -std::string Exception::getPrefix() const { return "Exception"; } +std::string Exception::getPrefix() const { return ""; } std::string Exception::getMessage() const { return message; } const char* Exception::what() const throw() { - if (whatStr.empty()) - whatStr = getPrefix() + ": " + message; + // Construct the what string the first time it is needed. + if (whatStr.empty()) { + whatStr = getPrefix(); + if (!whatStr.empty()) whatStr += ": "; + whatStr += message; + } return whatStr.c_str(); } diff --git a/qpid/cpp/src/qpid/ExceptionHolder.h b/qpid/cpp/src/qpid/ExceptionHolder.h deleted file mode 100644 index fed6308f19..0000000000 --- a/qpid/cpp/src/qpid/ExceptionHolder.h +++ /dev/null @@ -1,73 +0,0 @@ -#ifndef QPID_EXCEPTIONHOLDER_H -#define QPID_EXCEPTIONHOLDER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/memory.h" -#include - -namespace qpid { - -struct Raisable { - virtual ~Raisable() {}; - virtual void raise() const=0; - virtual std::string what() const=0; -}; - -/** - * Holder for exceptions. Allows the thread that notices an error condition to - * create an exception and store it to be thrown by another thread. - */ -class ExceptionHolder : public Raisable { - public: - ExceptionHolder() {} - ExceptionHolder(ExceptionHolder& ex) : Raisable(), wrapper(ex.wrapper) {} - /** Take ownership of ex */ - template ExceptionHolder(Ex* ex) { wrap(ex); } - template ExceptionHolder(const std::auto_ptr& ex) { wrap(ex.release()); } - - ExceptionHolder& operator=(ExceptionHolder& ex) { wrapper=ex.wrapper; return *this; } - template ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; } - template ExceptionHolder& operator=(std::auto_ptr ex) { wrap(ex.release()); return *this; } - - void raise() const { if (wrapper.get()) wrapper->raise() ; } - std::string what() const { return wrapper->what(); } - bool empty() const { return !wrapper.get(); } - operator bool() const { return !empty(); } - void reset() { wrapper.reset(); } - - private: - template struct Wrapper : public Raisable { - Wrapper(Ex* ptr) : exception(ptr) {} - void raise() const { throw *exception; } - std::string what() const { return exception->what(); } - std::auto_ptr exception; - }; - template void wrap(Ex* ex) { wrapper.reset(new Wrapper(ex)); } - std::auto_ptr wrapper; - -}; - - -} // namespace qpid - -#endif /*!QPID_EXCEPTIONHOLDER_H*/ diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 19e031dbe0..b6f6b9cee9 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -197,7 +197,7 @@ bool Connection::doOutput() //then do other output as needed: return outputTasks.doOutput(); }catch(ConnectionException& e){ - close(e.code, e.what(), 0, 0); + close(e.code, e.getMessage(), 0, 0); }catch(std::exception& e){ close(541/*internal error*/, e.what(), 0, 0); } diff --git a/qpid/cpp/src/qpid/client/Bounds.cpp b/qpid/cpp/src/qpid/client/Bounds.cpp index 1df21db941..aac18022bc 100644 --- a/qpid/cpp/src/qpid/client/Bounds.cpp +++ b/qpid/cpp/src/qpid/client/Bounds.cpp @@ -1,49 +1,40 @@ #include "Bounds.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Waitable.h" namespace qpid { namespace client { -using sys::Monitor; +using sys::Waitable; Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} -bool Bounds::expand(size_t sizeRequired, bool block) -{ - if (max) { - Monitor::ScopedLock l(lock); - current += sizeRequired; - if (block) { - while (current > max) { - QPID_LOG(debug, "Waiting for bounds: " << *this); - lock.wait(); - } - QPID_LOG(debug, "Bounds ok: " << *this); - } - return current <= max; - } else { - return true; +bool Bounds::expand(size_t sizeRequired, bool block) { + if (!max) return true; + Waitable::ScopedLock l(lock); + current += sizeRequired; + if (block) { + Waitable::ScopedWait w(lock); + while (current > max) + lock.wait(); } + return current <= max; } -void Bounds::reduce(size_t size) -{ +void Bounds::reduce(size_t size) { if (!max || size == 0) return; - Monitor::ScopedLock l(lock); + Waitable::ScopedLock l(lock); if (current == 0) return; - bool needNotify = current > max; current -= std::min(size, current); - if (needNotify && current < max) { - //todo: notify one at a time, but ensure that all threads are - //eventually notified - lock.notifyAll(); + if (current < max && lock.hasWaiters()) { + assert(lock.hasWaiters() == 1); + lock.notify(); } } -size_t Bounds::getCurrentSize() -{ - Monitor::ScopedLock l(lock); +size_t Bounds::getCurrentSize() { + Waitable::ScopedLock l(lock); return current; } @@ -52,4 +43,10 @@ std::ostream& operator<<(std::ostream& out, const Bounds& bounds) { return out; } +void Bounds::setException(const sys::ExceptionHolder& e) { + Waitable::ScopedLock l(lock); + lock.setException(e); + lock.waitWaiters(); // Wait for waiting threads to exit. +} + }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Bounds.h b/qpid/cpp/src/qpid/client/Bounds.h index db18becce3..838fcb8368 100644 --- a/qpid/cpp/src/qpid/client/Bounds.h +++ b/qpid/cpp/src/qpid/client/Bounds.h @@ -20,7 +20,7 @@ * under the License. * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" namespace qpid{ namespace client{ @@ -32,10 +32,11 @@ class Bounds bool expand(size_t, bool block); void reduce(size_t); size_t getCurrentSize(); - + void setException(const sys::ExceptionHolder&); + private: friend std::ostream& operator<<(std::ostream&, const Bounds&); - sys::Monitor lock; + sys::Waitable lock; const size_t max; size_t current; }; diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index df1afb87a9..05f6bb9733 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -83,11 +83,10 @@ void ConnectionHandler::incoming(AMQFrame& frame) void ConnectionHandler::outgoing(AMQFrame& frame) { - if (getState() == OPEN) { + if (getState() == OPEN) out(frame); - } else { - throw Exception("Connection is not open."); - } + else + throw Exception(errorText.empty() ? "Connection is not open." : errorText); } void ConnectionHandler::waitForOpen() diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 81eda0bffb..22f10d3620 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -119,41 +119,32 @@ void ConnectionImpl::close() closed(NORMAL, "Closed by client"); } -// Set closed flags and erase the sessions map, but keep the contents -// so sessions can be updated outside the lock. -ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + +template void ConnectionImpl::closeInternal(const F& f) { isClosed = true; connector.close(); - SessionVector save; - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr s = i->second.lock(); - if (s) save.push_back(s); + if (s) f(s); } sessions.clear(); - return save; } -void ConnectionImpl::closed(uint16_t code, const std::string& text) -{ - SessionVector save; - { - Mutex::ScopedLock l(lock); - save = closeInternal(l); - } - std::for_each(save.begin(), save.end(), boost::bind(&SessionImpl::connectionClosed, _1, code, text)); +void ConnectionImpl::closed(uint16_t code, const std::string& text) { + Mutex::ScopedLock l(lock); + setException(new ConnectionException(code, text)); + closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } static const std::string CONN_CLOSED("Connection closed by broker"); -void ConnectionImpl::shutdown() -{ +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); + // FIXME aconway 2008-06-06: exception use, connection-forced is incorrect here. + setException(new ConnectionException(CONNECTION_FORCED, CONN_CLOSED)); if (isClosed) return; - SessionVector save(closeInternal(l)); handler.fail(CONN_CLOSED); - Mutex::ScopedUnlock u(lock); - std::for_each(save.begin(), save.end(), - boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h index 655bca359b..089e73335d 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h @@ -49,7 +49,6 @@ class ConnectionImpl : public Bounds, { typedef std::map > SessionMap; - typedef std::vector > SessionVector; SessionMap sessions; ConnectionHandler handler; @@ -59,9 +58,8 @@ class ConnectionImpl : public Bounds, bool isClosed; bool isClosing; - template void detachAll(const F&); + template void closeInternal(const F&); - SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 66e1b9e40f..7b8cae943f 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -69,12 +69,14 @@ SessionImpl::SessionImpl(const std::string& name, } SessionImpl::~SessionImpl() { - Lock l(state); - if (state != DETACHED) { - QPID_LOG(warning, "Detaching deleted session"); - setState(DETACHED); - handleClosed(); - state.waitWaiters(); + { + Lock l(state); + if (state != DETACHED) { + QPID_LOG(warning, "Detaching deleted session"); + setState(DETACHED); + handleClosed(); + state.waitWaiters(); + } } connection->erase(channel); } diff --git a/qpid/cpp/src/qpid/sys/ExceptionHolder.h b/qpid/cpp/src/qpid/sys/ExceptionHolder.h new file mode 100644 index 0000000000..cfb971411e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/ExceptionHolder.h @@ -0,0 +1,75 @@ +#ifndef QPID_EXCEPTIONHOLDER_H +#define QPID_EXCEPTIONHOLDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/memory.h" +#include + + +namespace qpid { +namespace sys { + +struct Raisable { + virtual ~Raisable() {}; + virtual void raise() const=0; + virtual std::string what() const=0; +}; + +/** + * Holder for exceptions. Allows the thread that notices an error condition to + * create an exception and store it to be thrown by another thread. + */ +class ExceptionHolder : public Raisable { + public: + ExceptionHolder() {} + // Use default copy & assign. + + /** Take ownership of ex */ + template ExceptionHolder(Ex* ex) { wrap(ex); } + template ExceptionHolder(const boost::shared_ptr& ex) { wrap(ex.release()); } + + template ExceptionHolder& operator=(Ex* ex) { wrap(ex); return *this; } + template ExceptionHolder& operator=(boost::shared_ptr ex) { wrap(ex.release()); return *this; } + + void raise() const { if (wrapper.get()) wrapper->raise() ; } + std::string what() const { return wrapper->what(); } + bool empty() const { return !wrapper.get(); } + operator bool() const { return !empty(); } + void reset() { wrapper.reset(); } + + private: + template struct Wrapper : public Raisable { + Wrapper(Ex* ptr) : exception(ptr) {} + void raise() const { throw *exception; } + std::string what() const { return exception->what(); } + boost::shared_ptr exception; + }; + template void wrap(Ex* ex) { wrapper.reset(new Wrapper(ex)); } + boost::shared_ptr wrapper; +}; + + +}} // namespace qpid::sys + + +#endif /*!QPID_EXCEPTIONHOLDER_H*/ diff --git a/qpid/cpp/src/qpid/sys/Waitable.h b/qpid/cpp/src/qpid/sys/Waitable.h index 37392ed761..61b7e7d82b 100644 --- a/qpid/cpp/src/qpid/sys/Waitable.h +++ b/qpid/cpp/src/qpid/sys/Waitable.h @@ -21,8 +21,8 @@ * */ -#include "Monitor.h" - +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ExceptionHolder.h" #include namespace qpid { @@ -31,14 +31,18 @@ namespace sys { /** * A monitor that keeps track of waiting threads. Threads declare a * ScopedWait around wait() inside a ScopedLock to be considered - * waiters. + * waiters. + * + * Allows waiting threads to be interrupted by an exception. */ class Waitable : public Monitor { public: Waitable() : waiters(0) {} + ~Waitable() { assert(waiters == 0); } + /** Use this inside a scoped lock around the - * call to Monitor::wait to be counted as a waiter + * call to wait() to be counted as a waiter. */ struct ScopedWait { Waitable& w; @@ -46,22 +50,55 @@ class Waitable : public Monitor { ~ScopedWait() { if (--w.waiters==0) w.notifyAll(); } }; - /** Block till there are no more ScopedWaits. + /** Block till there are no more waiters in ScopedWaits. + * waitWaiters() does not raise an exception even if waiters + * were interrupted by one. *@pre Must be called inside a ScopedLock but NOT a ScopedWait. */ void waitWaiters() { while (waiters != 0) - wait(); + Monitor::wait(); } /** Returns the number of outstanding ScopedWaits. * Must be called with the lock held. */ - size_t hasWaiters() { return waiters; } - + size_t hasWaiters() const { + return waiters; + } + + /** Set an execption to interrupt waiters in ScopedWait. + * Must be called with the lock held. + */ + void setException(const ExceptionHolder& e) { + exception = e; + notifyAll(); + + } + + /** Throws an exception if one is set before or during the wait. */ + void wait() { + ExCheck e(exception); + Monitor::wait(); + } + + /** Throws an exception if one is set before or during the wait. */ + bool wait(const AbsTime& absoluteTime) { + ExCheck e(exception); + return Monitor::wait(absoluteTime); + } + + ExceptionHolder exception; + private: - friend struct ScopedWait; + struct ExCheck { + const ExceptionHolder& exception; + ExCheck(const ExceptionHolder& e) : exception(e) { e.raise(); } + ~ExCheck() { exception.raise(); } + }; + size_t waiters; + friend struct ScopedWait; }; }} // namespace qpid::sys -- cgit v1.2.1