diff options
Diffstat (limited to 'cpp')
28 files changed, 336 insertions, 509 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 84b0768c27..e1c50c14fc 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -142,7 +142,7 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody void ConnectionHandler::fail(const std::string& message) { - QPID_LOG(error, message); + QPID_LOG(warning, message); setState(FAILED); } diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index e409f0f2a9..bb50495c06 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -59,7 +59,6 @@ class ConnectionHandler : private StateManager, void send(const framing::AMQBody& body); void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); void error(uint16_t code, const std::string& message, framing::AMQBody* body); - void fail(const std::string& message); public: using InputHandler::handle; @@ -75,6 +74,7 @@ public: void waitForOpen(); void close(); + void fail(const std::string& message); CloseListener onClose; ErrorListener onError; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index dd986deec4..b248de8744 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" @@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) connector->setShutdownHandler(this); } -ConnectionImpl::~ConnectionImpl() { close(); } +ConnectionImpl::~ConnectionImpl() { + // Important to close the connector first, to ensure the + // connector thread does not call on us while the destructor + // is running. + connector->close(); +} void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session) { Mutex::ScopedLock l(lock); boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()]; - if (s.lock()) - throw ChannelBusyException(); + if (s.lock()) throw ChannelBusyException(); s = session; } @@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port, handler.pwd = pwd; handler.vhost = vhost; + QPID_LOG(info, "Connecting to " << host << ":" << port); connector->connect(host, port); connector->init(); handler.waitForOpen(); } -bool ConnectionImpl::setClosing() -{ - Mutex::ScopedLock l(lock); - if (isClosing || isClosed) { - return false; - } - isClosing = true; - return true; -} - -void ConnectionImpl::close() -{ - if (setClosing()) { - handler.close(); - } -} - void ConnectionImpl::idleIn() { - connector->close(); + close(); } void ConnectionImpl::idleOut() @@ -114,35 +103,52 @@ void ConnectionImpl::idleOut() connector->send(frame); } -template <class F> -void ConnectionImpl::forChannels(F functor) { - for (SessionMap::iterator i = sessions.begin(); - i != sessions.end(); ++i) { - try { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) functor(*s); - } catch (...) { assert(0); } +void ConnectionImpl::close() +{ + Mutex::ScopedLock l(lock); + if (isClosing || isClosed) return; + isClosing = true; + { + Mutex::ScopedUnlock u(lock); + handler.close(); + } + closed(REPLY_SUCCESS, "Closed by client"); +} + +// Set closed flags and erase the sessions map, but keep the contents +// so sessions can be updated outside the lock. +ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { + isClosed = true; + connector->close(); + SessionVector save; + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) save.push_back(s); } + sessions.clear(); + return save; } -void ConnectionImpl::shutdown() +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionBroke, _1, - INTERNAL_ERROR, "Unexpected socket closure.")); - sessions.clear(); - isClosed = true; + SessionVector save(closeInternal(l)); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text)); } -void ConnectionImpl::closed(uint16_t code, const std::string& text) +static const std::string CONN_CLOSED("Connection closed by broker"); + +void ConnectionImpl::shutdown() { Mutex::ScopedLock l(lock); if (isClosed) return; - forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); - sessions.clear(); - isClosed = true; - connector->close(); + SessionVector save(closeInternal(l)); + handler.fail(CONN_CLOSED); + Mutex::ScopedUnlock u(lock); + std::for_each(save.begin(), save.end(), + boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index 1fe8ac4653..bf8226a776 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -43,6 +43,8 @@ class ConnectionImpl : public framing::FrameHandler, { typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap; + typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector; + SessionMap sessions; ConnectionHandler handler; boost::shared_ptr<Connector> connector; @@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler, bool isClosed; bool isClosing; + template <class F> void detachAll(const F&); + + SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); void idleOut(); @@ -58,8 +63,6 @@ class ConnectionImpl : public framing::FrameHandler, void shutdown(); bool setClosing(); - template <class F> void forChannels(F functor); - public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 95314dcb40..4fb5aa6b4d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -27,7 +27,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" - +#include "qpid/Msg.h" #include <boost/bind.hpp> namespace qpid { @@ -43,6 +43,7 @@ Connector::Connector( send_buffer_size(buffer_size), version(ver), closed(true), + joined(true), timeout(0), idleIn(0), idleOut(0), timeoutHandler(0), @@ -52,11 +53,11 @@ Connector::Connector( Connector::~Connector() { close(); - if (receiver.id() && receiver.id() != Thread::current().id()) - receiver.join(); } void Connector::connect(const std::string& host, int port){ + Mutex::ScopedLock l(closedLock); + assert(closed); socket.connect(host, port); closed = false; poller = Poller::shared_ptr(new Poller); @@ -71,20 +72,27 @@ void Connector::connect(const std::string& host, int port){ } void Connector::init(){ + Mutex::ScopedLock l(closedLock); + assert(joined); ProtocolInitiation init(version); - writeDataBlock(init); + joined = false; receiver = Thread(this); } bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); + bool ret = !closed; if (!closed) { - poller->shutdown(); closed = true; - return true; + poller->shutdown(); + } + if (!joined && receiver.id() != Thread::current().id()) { + joined = true; + Mutex::ScopedUnlock u(closedLock); + receiver.join(); } - return false; + return ret; } void Connector::close() { diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index aefd91f6f4..121a1c33aa 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler, const int send_buffer_size; framing::ProtocolVersion version; - bool closed; sys::Mutex closedLock; + bool closed; + bool joined; sys::AbsTime lastIn; sys::AbsTime lastOut; @@ -112,6 +113,8 @@ class Connector : public framing::OutputHandler, void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); + + std::string identifier; friend class Channel; @@ -130,6 +133,7 @@ class Connector : public framing::OutputHandler, virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); virtual void setWriteTimeout(uint16_t timeout); + const std::string& getIdentifier() const { return identifier; } }; }} diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp index e61103981b..cb9372cee7 100644 --- a/cpp/src/qpid/client/Demux.cpp +++ b/cpp/src/qpid/client/Demux.cpp @@ -45,6 +45,10 @@ ScopedDivert::~ScopedDivert() demuxer.remove(dest); } +Demux::Demux() : defaultQueue(new Queue()) {} + +Demux::~Demux() { close(); } + Demux::QueuePtr ScopedDivert::getQueue() { return queue; diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h index 234282a8d2..dce24223f2 100644 --- a/cpp/src/qpid/client/Demux.h +++ b/cpp/src/qpid/client/Demux.h @@ -47,7 +47,8 @@ public: typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue; typedef boost::shared_ptr<Queue> QueuePtr; - Demux() : defaultQueue(new Queue()) {} + Demux(); + ~Demux(); void handle(framing::FrameSet::shared_ptr); void close(); diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index f4a7ff54d8..0783d5bc55 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -62,13 +62,12 @@ void Dispatcher::start() } void Dispatcher::run() -{ +{ Mutex::ScopedLock l(lock); if (running) throw Exception("Dispatcher is already running."); boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; - queue->open(); try { while (!queue->isClosed()) { Mutex::ScopedUnlock u(lock); diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 07a791bef3..5079c47b5e 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -125,7 +125,7 @@ void SessionCore::detach(int c, const std::string& t) { channel.next = 0; code=c; text=t; - l3.getDemux().close(); + l3.getDemux().close(); } void SessionCore::doClose(int code, const std::string& text) { @@ -270,7 +270,6 @@ void SessionCore::detached() { // network thread Lock l(state); check(state == SUSPENDING, COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED); - connection->erase(channel); doSuspend(REPLY_SUCCESS, OK); } @@ -379,22 +378,28 @@ bool isCloseResponse(const AMQFrame& frame) { // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + ConnectionImpl::shared_ptr save; { Lock l(state); + save=connection; // Ignore frames received while closing other than closed response. if (state==CLOSING && !isCloseResponse(frame)) return; } try { // Cast to expose private SessionHandler functions. - if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + // If we were detached by a session command, tell the connection. + if (!connection) save->erase(channel); + } + else { session->received(frame); l3.handle(frame); } } catch (const ChannelException& e) { QPID_LOG(error, "Channel exception:" << e.what()); doClose(e.code, e.what()); - } + } } void SessionCore::handleOut(AMQFrame& frame) diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index fd9ab14c63..de3f8f5bee 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -114,7 +114,7 @@ void Logger::log(const Statement& s, const std::string& msg) { if (flags&LEVEL) os << LevelTraits::name(s.level) << " "; if (flags&THREAD) - os << "[" << hex << qpid::sys::Thread::logId() << "] "; + os << "[" << qpid::sys::Thread::logId() << "] "; if (flags&FILE) os << s.file << ":"; if (flags&LINE) diff --git a/cpp/src/qpid/log/Options.cpp b/cpp/src/qpid/log/Options.cpp index 41a15dcf9f..dd296f3a93 100644 --- a/cpp/src/qpid/log/Options.cpp +++ b/cpp/src/qpid/log/Options.cpp @@ -26,7 +26,7 @@ namespace log { using namespace std; Options::Options(const std::string& name) : qpid::Options(name), - time(true), level(true), thread(false), source(false), function(false), trace(false) + time(true), level(true), thread(false), source(false), function(false), trace(false) { outputs.push_back("stderr"); selectors.push_back("error+"); @@ -43,14 +43,14 @@ Options::Options(const std::string& name) : qpid::Options(name), ("trace,t", optValue(trace), "Enables all logging" ) ("log-enable", optValue(selectors, "RULE"), ("Enables logging for selected levels and components. " - "RULE is in the form 'LEVEL[+][:PATTERN]' " - "Levels are one of: \n\t "+levels.str()+"\n" - "For example:\n" - "\t'--log-enable warning+' " - "logs all warning, error and critical messages.\n" - "\t'--log-enable debug:framing' " + "RULE is in the form 'LEVEL[+][:PATTERN]' " + "Levels are one of: \n\t "+levels.str()+"\n" + "For example:\n" + "\t'--log-enable warning+' " + "logs all warning, error and critical messages.\n" + "\t'--log-enable debug:framing' " "logs debug messages from the framing namespace. " - "This option can be used multiple times").c_str()) + "This option can be used multiple times").c_str()) ("log-time", optValue(time, "yes|no"), "Include time in log messages") ("log-level", optValue(level,"yes|no"), diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index dd8fe957bd..485f8c20f4 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -110,6 +110,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void init(AsynchIO* a, ConnectionInputHandler* h) { aio = a; inputHandler = h; + identifier = aio->getSocket().getPeerAddress(); } // Output side @@ -229,7 +230,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { }else{ framing::ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ - identifier = aio->getSocket().getPeerAddress(); QPID_LOG(debug, "INIT [" << identifier << "]"); inputHandler->initiated(protocolInit); initiated = true; diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index a7bb7745c9..2e9a389133 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -97,6 +97,8 @@ public: int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; + int toFd() const; + private: Socket(SocketPrivate*); }; diff --git a/cpp/src/qpid/sys/posix/Condition.h b/cpp/src/qpid/sys/posix/Condition.h index 1c8d1a80b1..86d6500ee9 100644 --- a/cpp/src/qpid/sys/posix/Condition.h +++ b/cpp/src/qpid/sys/posix/Condition.h @@ -52,15 +52,15 @@ class Condition }; Condition::Condition() { - QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, 0)); } Condition::~Condition() { - QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_destroy(&condition)); } void Condition::wait(Mutex& mutex) { - QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); } bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ @@ -75,11 +75,11 @@ bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ } void Condition::notify(){ - QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition)); } void Condition::notifyAll(){ - QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition)); } }} diff --git a/cpp/src/qpid/sys/posix/Mutex.h b/cpp/src/qpid/sys/posix/Mutex.h index 4cf0c3a3b0..590fd14bd0 100644 --- a/cpp/src/qpid/sys/posix/Mutex.h +++ b/cpp/src/qpid/sys/posix/Mutex.h @@ -136,11 +136,11 @@ struct PODMutex #define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } void PODMutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex)); } void PODMutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex)); } bool PODMutex::trylock() { @@ -148,19 +148,19 @@ bool PODMutex::trylock() { } Mutex::Mutex() { - QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_init(&mutex, recursiveMutexattr)); } Mutex::~Mutex(){ - QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_destroy(&mutex)); } void Mutex::lock() { - QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex)); } void Mutex::unlock() { - QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex)); } bool Mutex::trylock() { @@ -169,31 +169,31 @@ bool Mutex::trylock() { RWlock::RWlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr)); } RWlock::~RWlock(){ - QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_destroy(&rwlock)); } void RWlock::wlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_wrlock(&rwlock)); } void RWlock::rlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_rdlock(&rwlock)); } void RWlock::unlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_unlock(&rwlock)); } void RWlock::trywlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_trywrlock(&rwlock)); } void RWlock::tryrlock() { - QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock)); + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_tryrdlock(&rwlock)); } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 6e872c4fbc..ccb07bdafc 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -231,6 +231,10 @@ std::string Socket::getPeerAddress() const return impl->getName(false, true); } +int Socket::toFd() const { + return impl->fd; +} + int toFd(const SocketPrivate* s) { return s->fd; diff --git a/cpp/src/qpid/sys/posix/Thread.h b/cpp/src/qpid/sys/posix/Thread.h index 631a5d5378..e199c0eef6 100644 --- a/cpp/src/qpid/sys/posix/Thread.h +++ b/cpp/src/qpid/sys/posix/Thread.h @@ -60,16 +60,16 @@ class Thread Thread::Thread() : thread(0) {} Thread::Thread(Runnable* runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); + QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); } Thread::Thread(Runnable& runnable) { - QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); + QPID_POSIX_ASSERT_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); } void Thread::join(){ if (thread != 0) - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); + QPID_POSIX_ASSERT_THROW_IF(pthread_join(thread, 0)); } long Thread::id() { @@ -84,7 +84,7 @@ Thread Thread::current() { void Thread::yield() { - QPID_POSIX_THROW_IF(pthread_yield()); + QPID_POSIX_ASSERT_THROW_IF(pthread_yield()); } diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index 40aa0d4d27..5de8863345 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -23,16 +23,26 @@ */ #include "qpid/Exception.h" - #include <cerrno> +#include <assert.h> -#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO))); +#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO))) /** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */ #define QPID_POSIX_CHECK(RESULT) \ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) -/** Throw a posix error if errNo is non-zero */ +/** Throw a posix error if ERRNO is non-zero */ #define QPID_POSIX_THROW_IF(ERRNO) \ - if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) + do { int e=(ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0) + +/** Same as _THROW_IF in a release build, but abort a debug build */ +#ifdef NDEBUG +#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) qpid_posix \ + QPID_POSIX_THROW_IF(ERRNO) +#else +#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) \ + do { int e=(ERRNO); if (e) { errno=e; perror(0); assert(0); } } while(0) +#endif + #endif /*!_posix_check_h*/ diff --git a/cpp/src/tests/APRBaseTest.cpp b/cpp/src/tests/APRBaseTest.cpp deleted file mode 100644 index 3ec18d658e..0000000000 --- a/cpp/src/tests/APRBaseTest.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/sys/apr/APRBase.h" -#include "qpid_test_plugin.h" -#include <iostream> - -using namespace qpid::sys; - -class APRBaseTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(APRBaseTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMe() - { - APRBase::increment(); - APRBase::increment(); - APRBase::decrement(); - APRBase::decrement(); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(APRBaseTest); - diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index c7222c03a2..76cf2e8761 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -22,6 +22,7 @@ * */ +#include "SocketProxy.h" #include "qpid/sys/Thread.h" #include "qpid/broker/Broker.h" #include "qpid/client/Connection.h" @@ -30,52 +31,74 @@ #include "qpid/client/SubscriptionManager.h" /** - * A fixture to create an in-process broker and connect to it for tests. + * A fixture with an in-process broker. */ -struct BrokerFixture { +struct BrokerFixture { typedef qpid::broker::Broker Broker; typedef boost::shared_ptr<Broker> BrokerPtr; - struct OpenConnection : public qpid::client::Connection { - OpenConnection(int port) { open("localhost", port); } - }; - BrokerPtr broker; qpid::sys::Thread brokerThread; - OpenConnection connection; - qpid::client::Session_0_10 session; - qpid::client::SubscriptionManager subs; - qpid::client::LocalQueue lq; - - BrokerPtr newBroker() { + + BrokerFixture() { Broker::Options opts; opts.port=0; opts.workerThreads=1; - BrokerPtr b=Broker::create(opts); - // TODO aconway 2007-12-05: Without the following line - // the test can hang in the connection ctor. This is - // a race condition that should be fixed. - b->getPort(); - return b; + broker = Broker::create(opts); + // TODO aconway 2007-12-05: At one point BrokerFixture + // tests could hang in Connection ctor if the following + // line is removed. This may not be an issue anymore. + broker->getPort(); + brokerThread = qpid::sys::Thread(*broker); }; - BrokerFixture() : broker(newBroker()), - brokerThread(*broker), - connection(broker->getPort()), - session(connection.newSession()), - subs(session) - {} - ~BrokerFixture() { - connection.close(); broker->shutdown(); brokerThread.join(); } - /** Open a connection to the local broker */ + /** Open a connection to the broker. */ void open(qpid::client::Connection& c) { c.open("localhost", broker->getPort()); } }; +struct LocalConnection : public qpid::client::Connection { + LocalConnection(uint16_t port) { open("localhost", port); } +}; + +/** A local client connection via a socket proxy. */ +struct ProxyConnection : public qpid::client::Connection { + SocketProxy proxy; + ProxyConnection(int brokerPort) : proxy(brokerPort) { + open("localhost", proxy.getPort()); + } + ~ProxyConnection() { close(); } +}; + +/** + * A BrokerFixture with open Connection, Session and + * SubscriptionManager and LocalQueue for convenience. + */ +template <class ConnectionType> +struct SessionFixtureT : BrokerFixture { + ConnectionType connection; + qpid::client::Session_0_10 session; + qpid::client::SubscriptionManager subs; + qpid::client::LocalQueue lq; + + SessionFixtureT() : connection(broker->getPort()), + session(connection.newSession()), + subs(session) + {} + + ~SessionFixtureT() { + connection.close(); + } +}; + +typedef SessionFixtureT<LocalConnection> SessionFixture; +typedef SessionFixtureT<ProxyConnection> ProxySessionFixture; + + #endif /*!TESTS_BROKERFIXTURE_H*/ diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index bd8f5af6be..605d5e4885 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -44,7 +44,7 @@ const size_t FRAME_MAX = 256; * The test base defines the tests methods, derived classes * instantiate the channel in Basic or Message mode. */ -class ChannelTestBase : public CppUnit::TestCase, public BrokerFixture +class ChannelTestBase : public CppUnit::TestCase, public SessionFixture { struct Listener: public qpid::client::MessageListener { vector<Message> messages; diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 5f45e1f938..82db7b9545 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -20,7 +20,6 @@ */ #include "qpid_test_plugin.h" #include "BrokerFixture.h" -#include "SocketProxy.h" #include "qpid/client/Dispatcher.h" #include "qpid/client/Session_0_10.h" #include "qpid/framing/TransferContent.h" @@ -62,7 +61,7 @@ struct DummyListener : public MessageListener } }; -class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture +class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture { CPPUNIT_TEST_SUITE(ClientSessionTest); CPPUNIT_TEST(testQueueQuery); @@ -71,7 +70,6 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture CPPUNIT_TEST(testResumeExpiredError); CPPUNIT_TEST(testUseSuspendedError); CPPUNIT_TEST(testSuspendResume); - CPPUNIT_TEST(testDisconnectResume); CPPUNIT_TEST_SUITE_END(); public: @@ -85,11 +83,6 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes } - bool queueExists(const std::string& q) { - TypedResult<QueueQueryResult> result = session.queueQuery(q); - return result.get().getQueue() == q; - } - void testQueueQuery() { session =connection.newSession(); @@ -166,26 +159,11 @@ class ClientSessionTest : public CppUnit::TestCase, public BrokerFixture declareSubscribe(); session.suspend(); // Make sure we are still subscribed after resume. - connection.resume(session); + connection.resume(session); session.messageTransfer(content=TransferContent("my-message", "my-queue")); FrameSet::shared_ptr msg = session.get(); CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent()); } - - void testDisconnectResume() { - // FIXME aconway 2007-12-11: Test hanging. -// ProxyConnection c(broker->getPort()); -// Session_0_10 s = c.session; -// s.queueDeclare(queue="before"); -// CPPUNIT_ASSERT(queueExists("before")); -// s.queueDeclare(queue=string("after")); -// c.proxy.client.close(); // Disconnect the client. -// Connection c2; -// open(c2); -// c2.resume(s); -// CPPUNIT_ASSERT(queueExists("after")); -// c2.close(); - } }; // Make this test suite a plugin. diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h deleted file mode 100644 index f014941743..0000000000 --- a/cpp/src/tests/InProcessBroker.h +++ /dev/null @@ -1,245 +0,0 @@ -#ifndef _tests_InProcessBroker_h -#define _tests_InProcessBroker_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" -#include "qpid/client/Connector.h" -#include "qpid/client/Connection.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/BlockingQueue.h" -#include "qpid/shared_ptr.h" - -#include <vector> -#include <iostream> -#include <algorithm> - - -namespace qpid { - -using qpid::sys::ConnectionInputHandler; - -/** - * A client::Connector that connects directly to an in-process broker. - * Also allows you to "snoop" on frames exchanged between client & broker. - * - * see FramingTest::testRequestResponseRoundtrip() for example of use. - */ -class InProcessConnector : - public client::Connector -{ - public: - typedef sys::Mutex Mutex; - typedef Mutex::ScopedLock Lock; - typedef framing::FrameHandler FrameHandler; - typedef framing::AMQFrame AMQFrame; - - enum Sender {CLIENT,BROKER}; - - struct Task { - AMQFrame frame; - bool doOutput; - - Task() : doOutput(true) {} - Task(AMQFrame& f) : frame(f), doOutput(false) {} - }; - - /** Simulate the network thread of a peer with a queue and a thread. - * With setInputHandler(0) drops frames simulating network packet loss. - */ - class NetworkQueue : public sys::Runnable - { - public: - NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) { - thread=sys::Thread(this); - } - - ~NetworkQueue() { - queue.close(); - thread.join(); - } - - void push(AMQFrame& f) { queue.push(f); } - void activateOutput() { queue.push(Task()); } - - void run() { - try { - while(true) { - Task t = queue.pop(); - if (t.doOutput) { - if (connectionHandler) { - while (connectionHandler->doOutput()); - } - } else { - if (inputHandler) { - QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame)); - inputHandler->handle(t.frame); - } - else - QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame)); - } - } - } - catch (const std::exception& e) { - QPID_LOG(debug, QPID_MSG(receiver << " Terminated: " << e.what())); - return; - } - } - - void setConnectionInputHandler(ConnectionInputHandler* h) { - Lock l(lock); - inputHandler = h; - connectionHandler = h; - } - - void setInputHandler(FrameHandler* h) { - Lock l(lock); - inputHandler = h; - connectionHandler = 0; - } - - private: - sys::Mutex lock; - sys::BlockingQueue<Task> queue; - sys::Thread thread; - FrameHandler* inputHandler; - ConnectionInputHandler* connectionHandler; - const char* const receiver; - }; - - struct InProcessHandler : public sys::ConnectionOutputHandler { - Sender from; - NetworkQueue queue; - const char* const sender; - NetworkQueue* reverseQueue; - - InProcessHandler(Sender s) - : from(s), - queue(from==CLIENT? "BROKER" : "CLIENT"), - sender(from==BROKER? "BROKER" : "CLIENT"), - reverseQueue(0) - {} - - ~InProcessHandler() { } - - void send(AMQFrame& f) { - QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f)); - queue.push(f); - } - - void close() { - // Do not shut down the queue here, we may be in - // the queue's dispatch thread. - } - - void activateOutput() { - if (reverseQueue) reverseQueue->activateOutput(); - } - }; - - - InProcessConnector(shared_ptr<broker::Broker> b=broker::Broker::create(), - framing::ProtocolVersion v=framing::ProtocolVersion()) : - Connector(v), - protocolInit(v), - broker(b), - brokerOut(BROKER), - brokerConnection(&brokerOut, *broker), - clientOut(CLIENT), - isClosed(false) - { - clientOut.queue.setConnectionInputHandler(&brokerConnection); - brokerOut.reverseQueue = &clientOut.queue; - clientOut.reverseQueue = &brokerOut.queue; - } - - ~InProcessConnector() { - close(); - - } - - void connect(const std::string& /*host*/, int /*port*/) {} - - void init() { brokerConnection.initiated(protocolInit); } - - void close() { - if (!isClosed) { - isClosed = true; - brokerOut.close(); - clientOut.close(); - brokerConnection.closed(); - } - } - - /** Client's input handler. */ - void setInputHandler(framing::InputHandler* handler) { - brokerOut.queue.setInputHandler(handler); - } - - /** Called by client to send a frame */ - void send(framing::AMQFrame& frame) { - clientOut.handle(frame); - } - - /** Sliently discard frames sent by either party, lost network traffic. */ - void discard() { - brokerOut.queue.setInputHandler(0); - clientOut.queue.setConnectionInputHandler(0); - } - - shared_ptr<broker::Broker> getBroker() { return broker; } - - private: - sys::Mutex lock; - framing::ProtocolInitiation protocolInit; - shared_ptr<broker::Broker> broker; - InProcessHandler brokerOut; - broker::Connection brokerConnection; - InProcessHandler clientOut; - bool isClosed; -}; - -struct InProcessConnection : public client::Connection { - /** Connect to an existing broker */ - InProcessConnection(shared_ptr<broker::Broker> b=broker::Broker::create()) - : client::Connection( - shared_ptr<client::Connector>(new InProcessConnector(b))) - { open(""); } - - InProcessConnector& getConnector() { - return static_cast<InProcessConnector&>(*impl->getConnector()); - } - - /** Simulate disconnected network connection. */ - void disconnect() { getConnector().close(); } - - /** Discard frames, simulates lost network traffic. */ - void discard() { getConnector().discard(); } - - shared_ptr<broker::Broker> getBroker() { - return getConnector().getBroker(); - } -}; - -} // namespace qpid - -#endif // _tests_InProcessBroker_h diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 0dc4b47a84..a25c46b5b0 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -122,7 +122,6 @@ EXTRA_DIST += \ topictest \ .valgrind.supp \ .valgrindrc \ - InProcessBroker.h \ MessageUtils.h \ MockChannel.h \ MockConnectionInputHandler.h \ diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h index b985ded175..a37c1f2c3e 100644 --- a/cpp/src/tests/SocketProxy.h +++ b/cpp/src/tests/SocketProxy.h @@ -24,59 +24,141 @@ #include "qpid/sys/Socket.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/client/Connection.h" +#include "qpid/log/Statement.h" + +#include <algorithm> /** - * A simple socket proxy that forwards to another socket. Used between - * client & broker to simulate network failures. + * A simple socket proxy that forwards to another socket. + * Used between client & local broker to simulate network failures. */ -struct SocketProxy : public qpid::sys::Runnable +class SocketProxy : private qpid::sys::Runnable { - int port; // Port bound to server socket. - qpid::sys::Socket client, server; // Client & server sockets. + public: + /** Connect to connectPort on host, start a forwarding thread. + * Listen for connection on getPort(). + */ + SocketProxy(int connectPort, const std::string host="localhost") + : closed(false), port(listener.listen()) + { + int r=::pipe(closePipe); + if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); + client.connect(host, connectPort); + thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this)); + } + + ~SocketProxy() { close(); } - SocketProxy(const std::string& host, int port) { init(host,port); } - SocketProxy(int port) { init("localhost",port); } + /** Simulate a network disconnect. */ + void close() { + { + qpid::sys::Mutex::ScopedLock l(lock); + if (closed) return; + closed=true; + } + write(closePipe[1], this, 1); // Random byte to closePipe + thread.join(); + client.close(); + ::close(closePipe[0]); + ::close(closePipe[1]); + } - ~SocketProxy() { client.close(); server.close(); thread.join(); } + bool isClosed() const { + qpid::sys::Mutex::ScopedLock l(lock); + return closed; + } + + uint16_t getPort() const { return port; } private: - - void init(const std::string& host, int connectPort) { - client.connect(host, connectPort); - port = server.listen(); - thread=qpid::sys::Thread(this); + static void throwErrno(const std::string& msg) { + throw qpid::Exception(msg+":"+qpid::strError(errno)); } - - void run() { - try { - do { - ssize_t recv = server.recv(buffer, sizeof(buffer)); - if (recv <= 0) return; - ssize_t sent=client.send(buffer, recv); - if (sent < 0) return; - assert(sent == recv); // Assumes we can send as we receive. - } while (true); - } catch(...) {} + static void throwIf(bool condition, const std::string& msg) { + if (condition) throw qpid::Exception(msg); } + + struct FdSet : fd_set { + FdSet() : maxFd(0) { clear(); } + void clear() { FD_ZERO(this); } + void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } + bool isSet(int fd) const { return FD_ISSET(fd, this); } + bool operator[](int fd) const { return isSet(fd); } - qpid::sys::Thread thread; - char buffer[64*1024]; -}; + int maxFd; + }; -/** A local client connection via a socket proxy. */ -struct ProxyConnection : public qpid::client::Connection { - SocketProxy proxy; - qpid::client::Session_0_10 session; + enum { RD=1, WR=2, ER=4 }; - ProxyConnection(const std::string& host, int port) : proxy(port) { - open(host, proxy.port); - session=newSession(); - } + struct Selector { + FdSet rd, wr, er; + + void set(int fd, int sets) { + if (sets & RD) rd.set(fd); + if (sets & WR) wr.set(fd); + if (sets & ER) er.set(fd); + } + + int select() { + for (;;) { + int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); + int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); + if (r == -1 && errno == EINTR) continue; + if (r < 0) throwErrno(QPID_MSG("select returned " <<r)); + return r; + } + } + }; - ProxyConnection(int port) : proxy(port) { - open("localhost", proxy.port); - session=newSession(); + void run() { + std::auto_ptr<qpid::sys::Socket> server; + try { + // Accept incoming connections, watch closePipe. + Selector accept; + accept.set(listener.toFd(), RD|ER); + accept.set(closePipe[0], RD|ER); + accept.select(); + throwIf(accept.rd[closePipe[0]], "Closed by close()"); + throwIf(!accept.rd[listener.toFd()],"Accept failed"); + server.reset(listener.accept(0, 0)); + + // Pump data between client & server sockets, watch closePipe. + char buffer[1024]; + for (;;) { + Selector select; + select.set(server->toFd(), RD|ER); + select.set(client.toFd(), RD|ER); + select.set(closePipe[0], RD|ER); + select.select(); + throwIf(select.rd[closePipe[0]], "Closed by close()"); + // Read even if fd is in error to throw a useful exception. + bool gotData=false; + if (select.rd[server->toFd()] || select.er[server->toFd()]) { + client.write(buffer, server->read(buffer, sizeof(buffer))); + gotData=true; + } + if (select.rd[client.toFd()] || select.er[client.toFd()]) { + server->write(buffer, client.read(buffer, sizeof(buffer))); + gotData=true; + } + throwIf(!gotData, "No data from select()"); + } + } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + } + if (server.get()) server->close(); + close(); } + + mutable qpid::sys::Mutex lock; + bool closed; + qpid::sys::Socket client, listener; + uint16_t port; + int closePipe[2]; + qpid::sys::Thread thread; }; #endif diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index d19307a5c0..700aeef47c 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -21,7 +21,6 @@ #include "unit_test.h" #include "BrokerFixture.h" -#include "SocketProxy.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" @@ -52,7 +51,7 @@ struct Catcher : public Runnable { try { f(); } catch(const Ex& e) { caught=true; - BOOST_MESSAGE(e.what()); + BOOST_MESSAGE(string("Caught expected exception: ")+e.what()); } catch(const std::exception& e) { BOOST_ERROR(string("Bad exception: ")+e.what()); @@ -71,37 +70,29 @@ struct Catcher : public Runnable { } }; -// FIXME aconway 2007-12-11: Disabled hanging tests. -// BOOST_FIXTURE_TEST_CASE(DisconnectedGet, BrokerFixture) { -// ProxyConnection c(broker->getPort()); -// Catcher<ClosedException> get(bind(&Session_0_10::get, c.session)); -// c.proxy.client.close(); // Close the client side. -// BOOST_CHECK(get.join()); -// } - -// BOOST_FIXTURE_TEST_CASE(DisconnectedPop, BrokerFixture) { -// ProxyConnection c(broker->getPort()); -// c.session.queueDeclare(arg::queue="q"); -// subs.subscribe(lq, "q"); -// Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq))); -// c.proxy.client.close(); -// BOOST_CHECK(pop.join()); -// } +BOOST_FIXTURE_TEST_CASE(DisconnectedPop, ProxySessionFixture) { + ProxyConnection c(broker->getPort()); + session.queueDeclare(arg::queue="q"); + subs.subscribe(lq, "q"); + Catcher<ClosedException> pop(bind(&LocalQueue::pop, boost::ref(lq))); + connection.proxy.close(); + BOOST_CHECK(pop.join()); +} -// BOOST_FIXTURE_TEST_CASE(DisconnectedListen, BrokerFixture) { -// struct NullListener : public MessageListener { -// void received(Message&) { BOOST_FAIL("Unexpected message"); } -// } l; -// ProxyConnection c(broker->getPort()); -// c.session.queueDeclare(arg::queue="q"); -// subs.subscribe(l, "q"); -// Thread t(subs); -// c.proxy.client.close(); -// t.join(); -// BOOST_CHECK_THROW(c.session.close(), InternalErrorException); -// } +BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) { + struct NullListener : public MessageListener { + void received(Message&) { BOOST_FAIL("Unexpected message"); } + } l; + ProxyConnection c(broker->getPort()); + session.queueDeclare(arg::queue="q"); + subs.subscribe(l, "q"); + Thread t(subs); + connection.proxy.close(); + t.join(); + BOOST_CHECK_THROW(session.close(), InternalErrorException); +} -BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, BrokerFixture) { +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) { BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); } diff --git a/cpp/src/tests/quick_perftest b/cpp/src/tests/quick_perftest index 5522b7fee4..676436fdc7 100755 --- a/cpp/src/tests/quick_perftest +++ b/cpp/src/tests/quick_perftest @@ -1,2 +1,2 @@ #!/bin/sh -exec `dirname $0`/run_test ./perftest --summary --count 1000 +exec `dirname $0`/run_test ./perftest --summary --count 100 |