diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 84 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/Demux.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Demux.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/log/Logger.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/log/Options.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Socket.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Condition.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Mutex.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/Thread.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/check.h | 18 |
19 files changed, 140 insertions, 94 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 84b0768c27..e1c50c14fc 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -142,7 +142,7 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody void ConnectionHandler::fail(const std::string& message) { - QPID_LOG(error, message); + QPID_LOG(warning, message); setState(FAILED); } diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index e409f0f2a9..bb50495c06 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -59,7 +59,6 @@ class ConnectionHandler : private StateManager, void send(const framing::AMQBody& body); void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); void error(uint16_t code, const std::string& message, framing::AMQBody* body); - void fail(const std::string& message); public: using InputHandler::handle; @@ -75,6 +74,7 @@ public: void waitForOpen(); void close(); + void fail(const std::string& message); CloseListener onClose; ErrorListener onError; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index dd986deec4..b248de8744 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" @@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) connector->setShutdownHandler(this); } -ConnectionImpl::~ConnectionImpl() { close(); } +ConnectionImpl::~ConnectionImpl() { + // Important to close the connector first, to ensure the + // connector thread does not call on us while the destructor + // is running. + connector->close(); +} void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()]; - if (s.lock()) - throw ChannelBusyException(); + if (s.lock()) throw ChannelBusyException(); s = session; } @@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port, handler.pwd = pwd; handler.vhost = vhost; + QPID_LOG(info, "Connecting to " << host << ":" << port); connector->connect(host, port); connector->init(); handler.waitForOpen(); } -bool ConnectionImpl::setClosing() -{ - Mutex::ScopedLock l(lock); - if (isClosing || isClosed) { - return false; - } - isClosing = true; - return true; -} - -void ConnectionImpl::close() -{ - if (setClosing()) { - handler.close(); - } -} - void ConnectionImpl::idleIn() { - connector->close(); + close(); } void ConnectionImpl::idleOut() @@ -114,35 +103,52 @@ void ConnectionImpl::idleOut() connector->send(frame); } -template <class F> -void ConnectionImpl::forChannels(F functor) { - for (SessionMap::iterator i = sessions.begin(); - i != sessions.end(); ++i) { - try { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) functor(*s); - } catch (...) { assert(0); } +void ConnectionImpl::close() +{ + Mutex::ScopedLock l(lock); + if (isClosing || isClosed) return; + isClosing = true; + { + Mutex::ScopedUnlock u(lock); + handler.close(); + } + closed(REPLY_SUCCESS, "Closed by client"); +} + +// Set closed flags and erase the sessions map, but keep the contents +// so sessions can be updated outside the lock. +ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + isClosed = true; + connector->close(); + SessionVector save; + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) save.push_back(s); } + sessions.clear(); + return save; } -void ConnectionImpl::shutdown() +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionBroke, _1, - INTERNAL_ERROR, "Unexpected socket closure.")); - sessions.clear(); - isClosed = true; + SessionVector save(closeInternal(l)); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); } -void ConnectionImpl::closed(uint16_t code, const std::string& text) +static const std::string CONN_CLOSED("Connection closed by broker"); + +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); - sessions.clear(); - isClosed = true; - connector->close(); + SessionVector save(closeInternal(l)); + handler.fail(CONN_CLOSED); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), + boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 1fe8ac4653..bf8226a776 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -43,6 +43,8 @@ class ConnectionImpl : public framing::FrameHandler, { typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap; + typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector; + SessionMap sessions; ConnectionHandler handler; boost::shared_ptr<Connector> connector; @@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler, bool isClosed; bool isClosing; + template <class F> void detachAll(const F&); + + SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); @@ -58,8 +63,6 @@ class ConnectionImpl : public framing::FrameHandler, void shutdown(); bool setClosing(); - template <class F> void forChannels(F functor); - public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 95314dcb40..4fb5aa6b4d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -27,7 +27,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" - +#include "qpid/Msg.h" #include <boost/bind.hpp> namespace qpid { @@ -43,6 +43,7 @@ Connector::Connector( send_buffer_size(buffer_size), version(ver), closed(true), + joined(true), timeout(0), idleIn(0), idleOut(0), timeoutHandler(0), @@ -52,11 +53,11 @@ Connector::Connector( Connector::~Connector() { close(); - if (receiver.id() && receiver.id() != Thread::current().id()) - receiver.join(); } void Connector::connect(const std::string& host, int port){ + Mutex::ScopedLock l(closedLock); + assert(closed); socket.connect(host, port); closed = false; poller = Poller::shared_ptr(new Poller); @@ -71,20 +72,27 @@ void Connector::connect(const std::string& host, int port){ } void Connector::init(){ + Mutex::ScopedLock l(closedLock); + assert(joined); ProtocolInitiation init(version); - writeDataBlock(init); + joined = false; receiver = Thread(this); } bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); + bool ret = !closed; if (!closed) { - poller->shutdown(); closed = true; - return true; + poller->shutdown(); + } + if (!joined && receiver.id() != Thread::current().id()) { + joined = true; + Mutex::ScopedUnlock u(closedLock); + receiver.join(); } - return false; + return ret; } void Connector::close() { diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index aefd91f6f4..121a1c33aa 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler, const int send_buffer_size; framing::ProtocolVersion version; - bool closed; sys::Mutex closedLock; + bool closed; + bool joined; sys::AbsTime lastIn; sys::AbsTime lastOut; @@ -112,6 +113,8 @@ class Connector : public framing::OutputHandler, void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); + + std::string identifier; friend class Channel; @@ -130,6 +133,7 @@ class Connector : public framing::OutputHandler, virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); virtual void setWriteTimeout(uint16_t timeout); + const std::string& getIdentifier() const { return identifier; } }; }} diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp index e61103981b..cb9372cee7 100644 --- a/cpp/src/qpid/client/Demux.cpp +++ b/cpp/src/qpid/client/Demux.cpp @@ -45,6 +45,10 @@ ScopedDivert::~ScopedDivert() demuxer.remove(dest); } +Demux::Demux() : defaultQueue(new Queue()) {} + +Demux::~Demux() { close(); } + Demux::QueuePtr ScopedDivert::getQueue() { return queue; diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h index 234282a8d2..dce24223f2 100644 --- a/cpp/src/qpid/client/Demux.h +++ b/cpp/src/qpid/client/Demux.h @@ -47,7 +47,8 @@ public: typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue; typedef boost::shared_ptr<Queue> QueuePtr; - Demux() : defaultQueue(new Queue()) {} + Demux(); + ~Demux(); void handle(framing::FrameSet::shared_ptr); void close(); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index f4a7ff54d8..0783d5bc55 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -62,13 +62,12 @@ void Dispatcher::start() } void Dispatcher::run() -{ +{ Mutex::ScopedLock l(lock); if (running) throw Exception("Dispatcher is already running."); boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; - queue->open(); try { while (!queue->isClosed()) { Mutex::ScopedUnlock u(lock); diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 07a791bef3..5079c47b5e 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -125,7 +125,7 @@ void SessionCore::detach(int c, const std::string& t) { channel.next = 0; code=c; text=t; - l3.getDemux().close(); + l3.getDemux().close(); } void SessionCore::doClose(int code, const std::string& text) { @@ -270,7 +270,6 @@ void SessionCore::detached() { // network thread Lock l(state); check(state == SUSPENDING, COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED); - connection->erase(channel); doSuspend(REPLY_SUCCESS, OK); } @@ -379,22 +378,28 @@ bool isCloseResponse(const AMQFrame& frame) { // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + ConnectionImpl::shared_ptr save; { Lock l(state); + save=connection; // Ignore frames received while closing other than closed response. if (state==CLOSING && !isCloseResponse(frame)) return; } try { // Cast to expose private SessionHandler functions. - if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + // If we were detached by a session command, tell the connection. + if (!connection) save->erase(channel); + } + else { session->received(frame); l3.handle(frame); } } catch (const ChannelException& e) { QPID_LOG(error, "Channel exception:" << e.what()); doClose(e.code, e.what()); - } + } } void SessionCore::handleOut(AMQFrame& frame) diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index fd9ab14c63..de3f8f5bee 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -114,7 +114,7 @@ void Logger::log(const Statement& s, const std::string& msg) { if (flags&LEVEL) os << LevelTraits::name(s.level) << " "; if (flags&THREAD) - os << "[" << hex << qpid::sys::Thread::logId() << "] "; + os << "[" << qpid::sys::Thread::logId() << "] "; if (flags&FILE) os << s.file << ":"; if (flags&LINE) diff --git a/cpp/src/qpid/log/Options.cpp b/cpp/src/qpid/log/Options.cpp index 41a15dcf9f..dd296f3a93 100644 --- a/cpp/src/qpid/log/Options.cpp +++ b/cpp/src/qpid/log/Options.cpp @@ -26,7 +26,7 @@ namespace log { using namespace std; Options::Options(const std::string& name) : qpid::Options(name), - time(true), level(true), thread(false), source(false), function(false), trace(false) + time(true), level(true), thread(false), source(false), function(false), trace(false) { outputs.push_back("stderr"); selectors.push_back("error+"); @@ -43,14 +43,14 @@ Options::Options(const std::string& name) : qpid::Options(name), ("trace,t", optValue(trace), "Enables all logging" ) ("log-enable", optValue(selectors, "RULE"), ("Enables logging for selected levels and components. " - "RULE is in the form 'LEVEL[+][:PATTERN]' " - "Levels are one of: \n\t "+levels.str()+"\n" - "For example:\n" - "\t'--log-enable warning+' " - "logs all warning, error and critical messages.\n" - "\t'--log-enable debug:framing' " + "RULE is in the form 'LEVEL[+][:PATTERN]' " + "Levels are one of: \n\t "+levels.str()+"\n" + "For example:\n" + "\t'--log-enable warning+' " + "logs all warning, error and critical messages.\n" + "\t'--log-enable debug:framing' " "logs debug messages from the framing namespace. " - "This option can be used multiple times").c_str()) + "This option can be used multiple times").c_str()) ("log-time", optValue(time, "yes|no"), "Include time in log messages") ("log-level", optValue(level,"yes|no"), diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index dd8fe957bd..485f8c20f4 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -110,6 +110,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void init(AsynchIO* a, ConnectionInputHandler* h) { aio = a; inputHandler = h; + identifier = aio->getSocket().getPeerAddress(); } // Output side @@ -229,7 +230,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { }else{ framing::ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ - identifier = aio->getSocket().getPeerAddress(); QPID_LOG(debug, "INIT [" << identifier << "]"); inputHandler->initiated(protocolInit); initiated = true; diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index a7bb7745c9..2e9a389133 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -97,6 +97,8 @@ public: int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; + int toFd() const; + private: Socket(SocketPrivate*); }; diff --git a/cpp/src/qpid/sys/posix/Condition.h b/cpp/src/qpid/sys/posix/Condition.h index 1c8d1a80b1..86d6500ee9 100644 --- a/cpp/src/qpid/sys/posix/Condition.h +++ b/cpp/src/qpid/sys/posix/Condition.h @@ -52,15 +52,15 @@ class Condition }; Condition::Condition() { - QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, 0)); } Condition::~Condition() { - QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_destroy(&condition)); } void Condition::wait(Mutex& mutex) { - QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); } bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ @@ -75,11 +75,11 @@ bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ } void Condition::notify(){ - QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition)); } void Condition::notifyAll(){ - QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition)); } }} diff --git a/cpp/src/qpid/sys/posix/Mutex.h b/cpp/src/qpid/sys/posix/Mutex.h index 4cf0c3a3b0..590fd14bd0 100644 --- a/cpp/src/qpid/sys/posix/Mutex.h +++ b/cpp/src/qpid/sys/posix/Mutex.h @@ -136,11 +136,11 @@ struct PODMutex #define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } void PODMutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex)); } void PODMutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex)); } bool PODMutex::trylock() { @@ -148,19 +148,19 @@ bool PODMutex::trylock() { } Mutex::Mutex() { - QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr)); } Mutex::~Mutex(){ - QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_destroy(&mutex)); } void Mutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex)); } void Mutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex)); } bool Mutex::trylock() { @@ -169,31 +169,31 @@ bool Mutex::trylock() { RWlock::RWlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr)); } RWlock::~RWlock(){ - QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_destroy(&rwlock)); } void RWlock::wlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_wrlock(&rwlock)); } void RWlock::rlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_rdlock(&rwlock)); } void RWlock::unlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_unlock(&rwlock)); } void RWlock::trywlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_trywrlock(&rwlock)); } void RWlock::tryrlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_tryrdlock(&rwlock)); } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 6e872c4fbc..ccb07bdafc 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -231,6 +231,10 @@ std::string Socket::getPeerAddress() const return impl->getName(false, true); } +int Socket::toFd() const { + return impl->fd; +} + int toFd(const SocketPrivate* s) { return s->fd; diff --git a/cpp/src/qpid/sys/posix/Thread.h b/cpp/src/qpid/sys/posix/Thread.h index 631a5d5378..e199c0eef6 100644 --- a/cpp/src/qpid/sys/posix/Thread.h +++ b/cpp/src/qpid/sys/posix/Thread.h @@ -60,16 +60,16 @@ class Thread Thread::Thread() : thread(0) {} Thread::Thread(Runnable* runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); + QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); } Thread::Thread(Runnable& runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); + QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); } void Thread::join(){ if (thread != 0) - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); + QPID_POSIX_ASSERT_THROW_IF(pthread_join(thread, 0)); } long Thread::id() { @@ -84,7 +84,7 @@ Thread Thread::current() { void Thread::yield() { - QPID_POSIX_THROW_IF(pthread_yield()); + QPID_POSIX_ASSERT_THROW_IF(pthread_yield()); } diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index 40aa0d4d27..5de8863345 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -23,16 +23,26 @@ */ #include "qpid/Exception.h" - #include <cerrno> +#include <assert.h> -#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO))); +#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO))) /** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */ #define QPID_POSIX_CHECK(RESULT) \ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) -/** Throw a posix error if errNo is non-zero */ +/** Throw a posix error if ERRNO is non-zero */ #define QPID_POSIX_THROW_IF(ERRNO) \ - if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) + do { int e=(ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0) + +/** Same as _THROW_IF in a release build, but abort a debug build */ +#ifdef NDEBUG +#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) qpid_posix \ + QPID_POSIX_THROW_IF(ERRNO) +#else +#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) \ + do { int e=(ERRNO); if (e) { errno=e; perror(0); assert(0); } } while(0) +#endif + #endif /*!_posix_check_h*/ |