diff options
author | Alan Conway <aconway@apache.org> | 2006-12-20 17:54:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-12-20 17:54:05 +0000 |
commit | 013f077cade5451798b76c2912b12ec873b6177e (patch) | |
tree | 1bcf4f12bde03057695943d2094f9d4312094435 | |
parent | 7c208cd735560336b3e87d24c7ba966288256067 (diff) | |
download | qpid-python-013f077cade5451798b76c2912b12ec873b6177e.tar.gz |
Current state of event channel code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489159 13f79535-47bb-0310-9956-ffa450edef68
41 files changed, 1108 insertions, 779 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 0334b00fe9..b0947bd3b9 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -25,14 +25,6 @@ AC_USE_SYSTEM_EXTENSIONS AM_MISSING_PROG([HELP2MAN], [help2man]) -AC_ARG_ENABLE(warnings, -[ --enable-warnings turn on lots of compiler warnings (recommended)], -[case "${enableval}" in - yes|no) ;; - *) AC_MSG_ERROR([bad value ${enableval} for warnings option]) ;; - esac], - [enableval=yes]) - # Turn on this automake conditional if we are in a qpid # hierarchy (i.e. with gentools/ and specs/ sibling directories), # and if we have working java + javac. @@ -53,7 +45,12 @@ AM_CONDITIONAL([CAN_GENERATE_CODE], [test x$build = xyes]) # -Wunreachable-code -Wpadded -Winline # -Wshadow - warns about boost headers. -if test "${enableval}" = yes; then +AC_ARG_ENABLE(warnings, + [AS_HELP_STRING([--disable-warnings], + [Disable compiler warnings (default enabled)])], + [], [enable_warnings=yes]) + +if test "${enable_warnings}" = yes; then gl_COMPILER_FLAGS(-Werror) gl_COMPILER_FLAGS(-pedantic) gl_COMPILER_FLAGS(-Wall) @@ -106,7 +103,7 @@ AC_ARG_ENABLE(apr, yes|no) ;; *) AC_MSG_ERROR([invalid APR enable/disable value: $enable_APR]) ;; esac], -[enable_APR=yes]) +[enable_APR=no]) APR_MINIMUM_VERSION=1.2.2 AC_SUBST(APR_MINIMUM_VERSION) diff --git a/cpp/lib/broker/AutoDelete.cpp b/cpp/lib/broker/AutoDelete.cpp index ae48d10505..6d87c982b4 100644 --- a/cpp/lib/broker/AutoDelete.cpp +++ b/cpp/lib/broker/AutoDelete.cpp @@ -63,7 +63,7 @@ void AutoDelete::run(){ Monitor::ScopedLock l(monitor); while(!stopped){ process(); - monitor.wait(period*TIME_MSEC); + monitor.wait(now() + period*TIME_MSEC); } } diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 6c0d7a3f3f..e2265a292f 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -47,7 +47,7 @@ Broker::shared_ptr Broker::create(const Configuration& config) { } void Broker::run() { - acceptor->run(&factory); + acceptor->run(factory); } void Broker::shutdown() { diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 5d4f68a8af..979617b594 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -18,11 +18,15 @@ * under the License. * */ -#include <BrokerChannel.h> -#include <QpidError.h> +#include <assert.h> + #include <iostream> #include <sstream> -#include <assert.h> + +#include <boost/bind.hpp> + +#include "BrokerChannel.h" +#include "QpidError.h" using std::mem_fun_ref; using std::bind2nd; @@ -50,11 +54,18 @@ Channel::~Channel(){ } bool Channel::exists(const string& consumerTag){ + Mutex::ScopedLock l(lock); return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){ - if(tag.empty()) tag = tagGenerator.generate(); +void Channel::consume( + string& tag, Queue::shared_ptr queue, bool acks, + bool exclusive, ConnectionToken* const connection, const FieldTable*) +{ + Mutex::ScopedLock l(lock); + if(tag.empty()) tag = tagGenerator.generate(); + // TODO aconway 2006-12-13: enforce ownership of consumer + // with auto_ptr. ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception @@ -65,7 +76,8 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl } } -void Channel::cancel(consumer_iterator i){ +void Channel::cancel(consumer_iterator i) { + // Private, must be called with lock held. ConsumerImpl* c = i->second; consumers.erase(i); if(c){ @@ -75,6 +87,7 @@ void Channel::cancel(consumer_iterator i){ } void Channel::cancel(const string& tag){ + Mutex::ScopedLock l(lock); consumer_iterator i = consumers.find(tag); if(i != consumers.end()){ cancel(i); @@ -82,11 +95,14 @@ void Channel::cancel(const string& tag){ } void Channel::close(){ - //cancel all consumers - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - cancel(i); + { + Mutex::ScopedLock l(lock); + while(!consumers.empty()) { + cancel(consumers.begin()); + } } - //requeue: + // TODO aconway 2006-12-13: does recovery need to be atomic with + // cancelling all consumers? recover(true); } @@ -109,20 +125,21 @@ void Channel::rollback(){ } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ - Mutex::ScopedLock locker(deliveryLock); - - u_int64_t deliveryTag = currentDeliveryTag++; - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; + u_int64_t deliveryTag; + { + Mutex::ScopedLock l(lock); + deliveryTag = currentDeliveryTag++; + if(ackExpected){ + unacked.push_back( + DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; + } } - //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, deliveryTag, framesize); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; @@ -187,71 +204,99 @@ void Channel::complete(Message::shared_ptr& msg){ } exchange.reset(); }else{ - std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; + std::cout << "Exchange not known in" << BOOST_CURRENT_FUNCTION + << std::endl; } } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) { if(transactional){ + Mutex::ScopedLock locker(lock); accumulatedAck.update(deliveryTag, multiple); - //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: I think the outstanding prefetch size & count should + //be updated at this point... //TODO: ...this may then necessitate dispatching to consumers - }else{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); - if(i == unacked.end()){ - throw InvalidAckException(); - }else if(multiple){ - ack_iterator end = ++i; - for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); - unacked.erase(unacked.begin(), end); - - //recalculate the prefetch: - outstanding.reset(); - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); - }else{ - i->discard(); - i->subtractFrom(&outstanding); - unacked.erase(i); - } + } + else { + { + Mutex::ScopedLock locker(lock); + ack_iterator i = find_if( + unacked.begin(), unacked.end(), + boost::bind(&DeliveryRecord::matches, _1, deliveryTag)); + if(i == unacked.end()) { + throw InvalidAckException(); + } + else if(multiple) { + ack_iterator end = ++i; + for_each(unacked.begin(), end, + mem_fun_ref(&DeliveryRecord::discard)); + unacked.erase(unacked.begin(), end); + //recalculate the prefetch: + outstanding.reset(); + for_each( + unacked.begin(), unacked.end(), + boost::bind(&DeliveryRecord::addTo, _1, &outstanding)); + } + else { + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); + } + } //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + + // TODO aconway 2006-12-13: Does this need to be atomic? + // If so we need a redesign, requestDispatch re-enters + // Channel::dispatch. + // + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ j->second->requestDispatch(); } } } -void Channel::recover(bool requeue){ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstanding.reset(); - std::list<DeliveryRecord> copy = unacked; - unacked.clear(); - for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); +void Channel::recover(bool requeue) { + std::list<DeliveryRecord> copyUnacked; + boost::function1<void, DeliveryRecord&> recoverFn; + { + Mutex::ScopedLock l(lock); + if(requeue) { + outstanding.reset(); + copyUnacked.swap(unacked); + recoverFn = boost::bind(&DeliveryRecord::requeue, _1); + } + else { + copyUnacked = unacked; + recoverFn = boost::bind(&DeliveryRecord::redeliver, _1, this); + } } + // TODO aconway 2006-12-13: Does recovery of copyUnacked have to + // be atomic with extracting the list? + for_each(copyUnacked.begin(), copyUnacked.end(), recoverFn); } bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Mutex::ScopedLock l(lock); + // TODO aconway 2006-12-13: Nasty to have all these external calls + // inside a critical.section but none appear to have blocking potential. + // sendGetOk does non-blocking IO + // Message::shared_ptr msg = queue->dequeue(); - if(msg){ - Mutex::ScopedLock locker(deliveryLock); + if(msg) { u_int64_t myDeliveryTag = currentDeliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + u_int32_t count = queue->getMessageCount(); + msg->sendGetOk(out, id, count + 1, myDeliveryTag, framesize); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } return true; - }else{ - return false; } + return false; } -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, + u_int64_t deliveryTag){ msg->deliver(out, id, consumerTag, deliveryTag, framesize); } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index fa3912c78e..776862dd34 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -79,10 +79,10 @@ namespace qpid { u_int32_t prefetchSize; u_int16_t prefetchCount; Prefetch outstanding; - u_int32_t framesize; + const u_int32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - qpid::sys::Mutex deliveryLock; + qpid::sys::Mutex lock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; MessageStore* const store; diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 598de2d590..148dbefa78 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -143,7 +143,8 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) { u_int64_t expected = expectedContentSize(); if (expected != buffer.available()) { - std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + std::cout << "WARN: Expected " << expectedContentSize() + << " bytes, got " << buffer.available() << std::endl; throw Exception("Cannot decode content, buffer not large enough."); } diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 0e48d3b13d..deb7c38824 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -83,7 +83,8 @@ bool Queue::dispatch(Message::shared_ptr& msg){ return false; }else if(exclusive){ if(!exclusive->deliver(msg)){ - std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; + std::cout << "WARNING: Dropping undeliverable message " + << "from queue with exclusive consumer." << std::endl; } return true; }else{ diff --git a/cpp/lib/broker/TopicExchange.cpp b/cpp/lib/broker/TopicExchange.cpp index 3ebb3c8c56..73a633859b 100644 --- a/cpp/lib/broker/TopicExchange.cpp +++ b/cpp/lib/broker/TopicExchange.cpp @@ -81,10 +81,7 @@ void TopicPattern::normalize() { namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. -// Need StringRef class that operates on a string in place witout copy. -// Should be applied everywhere strings are extracted from frames. -// + bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) { // Invariant: [pattern_begin..p) matches [target_begin..t) diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index ac8b4a9ced..244831effe 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -28,32 +28,33 @@ qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} qpid::client::ResponseHandler::~ResponseHandler(){} -bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ +bool qpid::client::ResponseHandler::validate( + const qpid::framing::AMQMethodBody& expected) +{ return expected.match(response.get()); } void qpid::client::ResponseHandler::waitForResponse(){ Monitor::ScopedLock l(monitor); - if(waiting){ + while(waiting) monitor.wait(); - } } -void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ - response = _response; +void qpid::client::ResponseHandler::signalResponse( + qpid::framing::AMQMethodBody::shared_ptr _response) +{ Monitor::ScopedLock l(monitor); + response = _response; waiting = false; monitor.notify(); } void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ Monitor::ScopedLock l(monitor); - if(waiting){ + while(waiting) monitor.wait(); - } - if(!validate(expected)){ + if(!validate(expected)) THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); - } } void qpid::client::ResponseHandler::expect(){ diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp index 0161518011..ef88c5cb74 100644 --- a/cpp/lib/common/Exception.cpp +++ b/cpp/lib/common/Exception.cpp @@ -20,6 +20,7 @@ */ #include <Exception.h> +#include <iostream> namespace qpid { @@ -29,14 +30,32 @@ Exception::Exception(const std::string& str) throw() : whatStr(str) {} Exception::Exception(const char* str) throw() : whatStr(str) {} +Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {} + Exception::~Exception() throw() {} const char* Exception::what() const throw() { return whatStr.c_str(); } std::string Exception::toString() const throw() { return whatStr; } -Exception* Exception::clone() const throw() { return new Exception(*this); } +Exception::auto_ptr Exception::clone() const throw() { + return Exception::auto_ptr(new Exception(*this)); +} void Exception::throwSelf() const { throw *this; } +const char* Exception::defaultMessage = "Unexpected exception"; + +void Exception::log(const char* what, const char* message) { + std::cout << message << ": " << what << std::endl; +} + +void Exception::log(const std::exception& e, const char* message) { + log(e.what(), message); +} + +void Exception::logUnknown(const char* message) { + log("unknown exception.", message); +} + } // namespace qpid diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index f35d427bb0..185c395283 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -26,6 +26,7 @@ #include <string> #include <memory> #include <boost/shared_ptr.hpp> +#include <boost/function.hpp> namespace qpid { @@ -38,6 +39,10 @@ class Exception : public std::exception std::string whatStr; public: + typedef boost::shared_ptr<Exception> shared_ptr; + typedef boost::shared_ptr<const Exception> shared_ptr_const; + typedef std::auto_ptr<Exception> auto_ptr; + Exception() throw(); Exception(const std::string& str) throw(); Exception(const char* str) throw(); @@ -48,14 +53,65 @@ class Exception : public std::exception virtual const char* what() const throw(); virtual std::string toString() const throw(); - virtual Exception* clone() const throw(); + virtual std::auto_ptr<Exception> clone() const throw(); virtual void throwSelf() const; - typedef boost::shared_ptr<Exception> shared_ptr; -}; + /** Default message: "Unknown exception" or something like it. */ + static const char* defaultMessage; + /** + * Log a message of the form "message: what" + *@param what Exception's what() message. + *@param message Prefix message. + */ + static void log(const char* what, const char* message = defaultMessage); -} + /** + * Log an exception. + *@param e Exception to log. + + */ + static void log( + const std::exception& e, const char* message = defaultMessage); + + + /** + * Log an unknown exception - use in catch(...) + *@param message Prefix message. + */ + static void logUnknown(const char* message = defaultMessage); + + /** + * Wrapper template function to call another function inside + * try/catch and log any exception. Use boost::bind to wrap + * member function calls or functions with arguments. + * + *@param f Function to call in try block. + *@param retrhow If true the exception is rethrown. + *@param message Prefix message. + */ + template <class T> + static T tryCatchLog(boost::function0<T> f, bool rethrow=true, + const char* message=defaultMessage) + { + try { + return f(); + } + catch (const std::exception& e) { + log(e, message); + if (rethrow) + throw; + } + catch (...) { + logUnknown(message); + if (rethrow) + throw; + } + } + +}; + +} // namespace qpid #endif /*!_Exception_*/ diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp index de8d7b2487..e69de29bb2 100644 --- a/cpp/lib/common/ExceptionHolder.cpp +++ b/cpp/lib/common/ExceptionHolder.cpp @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ExceptionHolder.h" - -namespace qpid { - -ExceptionHolder::ExceptionHolder(const std::exception& e) { - const Exception* ex = dynamic_cast<const Exception*>(&e); - if (ex) { - reset(ex->clone()); - } else { - reset(new Exception(e.what())); - } -} - -} diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h index 83d0884be9..e69de29bb2 100644 --- a/cpp/lib/common/ExceptionHolder.h +++ b/cpp/lib/common/ExceptionHolder.h @@ -1,62 +0,0 @@ -#ifndef _qpid_ExceptionHolder_h -#define _qpid_ExceptionHolder_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <Exception.h> -#include <boost/shared_ptr.hpp> - -namespace qpid { - -/** - * Holder for a heap-allocated exc eption that can be stack allocated - * and thrown safely. - * - * Basically this is a shared_ptr with the Exception functions added - * so the catcher need not be aware that it is a pointer rather than a - * reference. - * - * shared_ptr is chosen over auto_ptr because it has normal - * copy semantics. - */ -class ExceptionHolder : public Exception, public boost::shared_ptr<Exception> -{ - public: - typedef boost::shared_ptr<Exception> shared_ptr; - - ExceptionHolder() throw() {} - ExceptionHolder(Exception* p) throw() : shared_ptr(p) {} - ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {} - - ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {} - ExceptionHolder(const std::exception& e); - - ~ExceptionHolder() throw() {} - - const char* what() const throw() { return (*this)->what(); } - std::string toString() const throw() { return (*this)->toString(); } - virtual Exception* clone() const throw() { return (*this)->clone(); } - virtual void throwSelf() const { (*this)->throwSelf(); } -}; - -} // namespace qpid - - - -#endif /*!_qpid_ExceptionHolder_h*/ diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index e1f7503282..997558f3da 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -24,21 +24,23 @@ apr_hdr = \ $(apr)/LFSessionContext.h posix = sys/posix -posix_src = \ - $(posix)/PosixAcceptor.cpp \ - $(posix)/Socket.cpp \ - $(posix)/Thread.cpp \ - $(posix)/check.cpp \ - $(posix)/EventChannel.cpp \ - $(posix)/EventChannelThreads.cpp -posix_hdr = \ - $(posix)/check.h \ - $(posix)/EventChannel.h \ - $(posix)/EventChannelThreads.h +posix_src = \ + $(posix)/EventChannelAcceptor.cpp \ + $(posix)/Socket.cpp \ + $(posix)/Thread.cpp \ + $(posix)/check.cpp \ + $(posix)/EventChannel.cpp \ + $(posix)/EventChannelThreads.cpp \ + $(posix)/EventChannelConnection.cpp +posix_hdr = \ + $(posix)/check.h \ + $(posix)/EventChannel.h \ + $(posix)/EventChannelThreads.h \ + $(posix)/EventChannelConnection.h -EXTRA_DIST=$(posix_src) $(posix_hdr) -platform_src = $(apr_src) -platform_hdr = $(apr_hdr) +EXTRA_DIST=$(apr_src) $(apr_hdr) +platform_src = $(posix_src) +platform_hdr = $(posix_hdr) framing = framing gen = $(srcdir)/../../gen @@ -76,7 +78,6 @@ libqpidcommon_la_SOURCES = \ $(gen)/AMQP_MethodVersionMap.cpp \ $(gen)/AMQP_ServerProxy.cpp \ Exception.cpp \ - ExceptionHolder.cpp \ QpidError.cpp \ sys/Runnable.cpp \ sys/Time.cpp @@ -107,7 +108,6 @@ nobase_pkginclude_HEADERS = \ $(framing)/amqp_types.h \ $(framing)/AMQP_HighestVersion.h \ Exception.h \ - ExceptionHolder.h \ QpidError.h \ SharedObject.h \ sys/Acceptor.h \ diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp index 7f4f9e2f34..9cbd66c841 100644 --- a/cpp/lib/common/QpidError.cpp +++ b/cpp/lib/common/QpidError.cpp @@ -22,23 +22,41 @@ #include <QpidError.h> #include <sstream> -using namespace qpid; +namespace qpid { QpidError::QpidError() : code(0) {} -QpidError::QpidError(int _code, const std::string& _msg, - const SrcLine& _loc) throw() +QpidError::QpidError( + int _code, const std::string& _msg, Location _loc) throw() : code(_code), msg(_msg), location(_loc) { - std::ostringstream os; - os << "Error [" << code << "] " << msg << " (" - << location.file << ":" << location.line << ")"; - whatStr = os.str(); + setWhat(); +} + +QpidError::QpidError( + int _code, const char* _msg, Location _loc) throw() + : code(_code), msg(_msg), location(_loc) +{ + setWhat(); } QpidError::~QpidError() throw() {} -Exception* QpidError::clone() const throw() { return new QpidError(*this); } +Exception::auto_ptr QpidError::clone() const throw() { + return Exception::auto_ptr(new QpidError(*this)); +} void QpidError::throwSelf() const { throw *this; } +void QpidError::setWhat() { + std::ostringstream os; + os << "Error [" << code << "] " << msg; + if (location.file) { + os << " (" ; + os << location.file << ":" << location.line; + os << ")"; + } + whatStr = os.str(); +} + +} // namespace qpid diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h index 30d9d27076..9a47aa5e00 100644 --- a/cpp/lib/common/QpidError.h +++ b/cpp/lib/common/QpidError.h @@ -24,37 +24,45 @@ #include <memory> #include <ostream> #include <Exception.h> +#include <boost/current_function.hpp> namespace qpid { -struct SrcLine { - public: - SrcLine(const std::string& file_="", int line_=0) : - file(file_), line(line_) {} - - std::string file; - int line; -}; - class QpidError : public Exception { public: + // Use macro QPID_LOCATION to construct a location. + struct Location { + Location(const char* function_=0, const char* file_=0, int line_=0) : + function(function_), file(file_), line(line_) {} + const char* function; + const char* file; + int line; + }; + const int code; const std::string msg; - const SrcLine location; + const Location location; QpidError(); - QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw(); + QpidError(int _code, const char* _msg, const Location _loc) throw(); + QpidError(int _code, const std::string& _msg, const Location _loc) throw(); + ~QpidError() throw(); - Exception* clone() const throw(); + Exception::auto_ptr clone() const throw(); void throwSelf() const; + + private: + void setWhat(); }; } // namespace qpid -#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) +#define QPID_ERROR_LOCATION \ + ::qpid::QpidError::Location(BOOST_CURRENT_FUNCTION, __FILE__, __LINE__) -#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) +#define QPID_ERROR(CODE, MESSAGE) \ + ::qpid::QpidError((CODE), (MESSAGE), QPID_ERROR_LOCATION) #define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index bec1946fb7..1ff3ff191f 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -40,9 +40,9 @@ namespace qpid { static AMQP_MethodVersionMap versionMap; u_int16_t channel; - u_int8_t type;//used if the body is decoded separately from the 'head' + u_int8_t type;//used if body decoded separately from 'head' AMQBody::shared_ptr body; - AMQBody::shared_ptr createMethodBody(Buffer& buffer); + AMQBody::shared_ptr createMethodBody(Buffer& buffer); public: AMQFrame(); diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h index e6bc27a593..7aed068dd0 100644 --- a/cpp/lib/common/sys/Acceptor.h +++ b/cpp/lib/common/sys/Acceptor.h @@ -33,10 +33,11 @@ class SessionHandlerFactory; class Acceptor : public qpid::SharedObject<Acceptor> { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); - virtual ~Acceptor() = 0; - virtual int16_t getPort() const = 0; - virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + static Acceptor::shared_ptr create( + int16_t port, int backlog, int threads, bool trace = false); + virtual ~Acceptor(); + virtual int getPort() const = 0; + virtual void run(SessionHandlerFactory& factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h index b625b2c9b0..7a9555480f 100644 --- a/cpp/lib/common/sys/AtomicCount.h +++ b/cpp/lib/common/sys/AtomicCount.h @@ -21,36 +21,52 @@ #include <boost/detail/atomic_count.hpp> #include <boost/noncopyable.hpp> +#include <boost/function.hpp> namespace qpid { namespace sys { /** - * Atomic counter. + * Increment counter in constructor and decrement in destructor. + * Optionally call a function if the decremented counter value is 0. + * Note the function must not throw, it is called in the destructor. */ -class AtomicCount : boost::noncopyable { +template <class Count> +class ScopedIncrement : boost::noncopyable { public: - class ScopedDecrement : boost::noncopyable { - public: - /** Decrement counter in constructor and increment in destructor. */ - ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } - ~ScopedDecrement() { ++count; } - /** Return the value returned by the decrement. */ - operator long() { return value; } - private: - AtomicCount& count; - long value; - }; + ScopedIncrement(Count& c, boost::function0<void> f=0) + : count(c), callback(f) { ++count; } + ~ScopedIncrement() { if (--count == 0 && callback) callback(); } - class ScopedIncrement : boost::noncopyable { - public: - /** Increment counter in constructor and increment in destructor. */ - ScopedIncrement(AtomicCount& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } - private: - AtomicCount& count; - }; + private: + Count& count; + boost::function0<void> callback; +}; +/** Decrement counter in constructor and increment in destructor. */ +template <class Count> +class ScopedDecrement : boost::noncopyable { + public: + ScopedDecrement(Count& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + + /** Return the value after the decrement. */ + operator long() { return value; } + + private: + Count& count; + long value; +}; + + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { + public: + typedef ScopedIncrement<AtomicCount> ScopedIncrement; + typedef ScopedDecrement<AtomicCount> ScopedDecrement; + AtomicCount(long value = 0) : count(value) {} void operator++() { ++count ; } diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp index 30122c682f..5d4f48a373 100644 --- a/cpp/lib/common/sys/Runnable.cpp +++ b/cpp/lib/common/sys/Runnable.cpp @@ -29,4 +29,8 @@ Runnable::Functor Runnable::functor() return boost::bind(&Runnable::run, this); } +void FunctorRunnable::run() { + f(); +} + }} diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h index fb3927c612..ef18897b09 100644 --- a/cpp/lib/common/sys/Runnable.h +++ b/cpp/lib/common/sys/Runnable.h @@ -44,7 +44,16 @@ class Runnable Functor functor(); }; -}} +/** Runnable wrapper for a functor */ +class FunctorRunnable : public Runnable { + public: + explicit FunctorRunnable(const Runnable::Functor& runMe) : f(runMe) {} + void run(); + private: + Runnable::Functor f; +}; + +}} // namespace qpid::sys #endif diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h index d793a240c6..e35ed5b07c 100644 --- a/cpp/lib/common/sys/Socket.h +++ b/cpp/lib/common/sys/Socket.h @@ -70,8 +70,11 @@ class Socket */ int listen(int port = 0, int backlog = 10); + /** Accept a connection. This socket must be listening */ + Socket accept(); + /** Get file descriptor */ - int fd(); + int fd() const; private: #ifdef USE_APR diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h index 47b95b6234..9647dc2414 100644 --- a/cpp/lib/common/sys/Thread.h +++ b/cpp/lib/common/sys/Thread.h @@ -116,7 +116,8 @@ Thread::Thread(Runnable& runnable) { } void Thread::join(){ - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); + if (thread != 0) + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); } long Thread::id() { diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h index 3dd46741d8..4c6951b429 100644 --- a/cpp/lib/common/sys/Time.h +++ b/cpp/lib/common/sys/Time.h @@ -22,6 +22,7 @@ * */ +#include <limits> #include <stdint.h> #ifdef USE_APR @@ -33,7 +34,7 @@ namespace qpid { namespace sys { -/** Time in nanoseconds */ +/** Time in nanoseconds. */ typedef int64_t Time; Time now(); @@ -47,6 +48,9 @@ const Time TIME_USEC = 1000; /** Nanoseconds per nanosecond. */ const Time TIME_NSEC = 1; +/** Value to represent an infinite timeout */ +const Time TIME_INFINITE = std::numeric_limits<Time>::max(); + #ifndef USE_APR struct timespec toTimespec(const Time& t); struct timespec& toTimespec(struct timespec& ts, const Time& t); diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp index 6853833797..1bd23819f4 100644 --- a/cpp/lib/common/sys/apr/APRAcceptor.cpp +++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp @@ -32,20 +32,16 @@ class APRAcceptor : public Acceptor { public: APRAcceptor(int16_t port, int backlog, int threads, bool trace); - virtual int16_t getPort() const; - virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual int getPort() const; + virtual void run(qpid::sys::SessionHandlerFactory& factory); virtual void shutdown(); private: - void shutdownImpl(); - - private: int16_t port; bool trace; LFProcessor processor; apr_socket_t* socket; volatile bool running; - Mutex shutdownLock; }; // Define generic Acceptor::create() to return APRAcceptor. @@ -69,13 +65,13 @@ Acceptor::~Acceptor() {} CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); } -int16_t APRAcceptor::getPort() const { +int APRAcceptor::getPort() const { apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); return address->port; } -void APRAcceptor::run(SessionHandlerFactory* factory) { +void APRAcceptor::run(SessionHandlerFactory& factory) { running = true; processor.start(); std::cout << "Listening on port " << getPort() << "..." << std::endl; @@ -90,32 +86,24 @@ void APRAcceptor::run(SessionHandlerFactory* factory) { CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace); - session->init(factory->create(session)); + session->init(factory.create(session)); }else{ - Mutex::ScopedLock locker(shutdownLock); - if(running) { - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - shutdownImpl(); + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; } } } + shutdown(); } void APRAcceptor::shutdown() { - Mutex::ScopedLock locker(shutdownLock); if (running) { - shutdownImpl(); + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); } } -void APRAcceptor::shutdownImpl() { - Mutex::ScopedLock locker(shutdownLock); - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); -} - }} diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp index 2b6fc92623..f5d59e31d7 100644 --- a/cpp/lib/common/sys/apr/LFProcessor.cpp +++ b/cpp/lib/common/sys/apr/LFProcessor.cpp @@ -27,8 +27,6 @@ using namespace qpid::sys; using qpid::QpidError; -// TODO aconway 2006-10-12: stopped is read outside locks. -// LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp index 16c7ec9c3f..860ecd6b07 100644 --- a/cpp/lib/common/sys/posix/EventChannel.cpp +++ b/cpp/lib/common/sys/posix/EventChannel.cpp @@ -1,4 +1,4 @@ -/* +/* * * Copyright (c) 2006 The Apache Software Foundation * @@ -16,6 +16,13 @@ * */ +// TODO aconway 2006-12-15: Locking review. + +// TODO aconway 2006-12-15: use Descriptor pointers everywhere, +// get them from channel, pass them to Event constructors. +// Eliminate lookup. + + #include <mqueue.h> #include <string.h> #include <iostream> @@ -29,10 +36,10 @@ #include <queue> #include <boost/ptr_container/ptr_map.hpp> -#include <boost/current_function.hpp> +#include <boost/noncopyable.hpp> +#include <boost/bind.hpp> #include <QpidError.h> -#include <sys/Monitor.h> #include "check.h" #include "EventChannel.h" @@ -40,127 +47,319 @@ using namespace std; -// Convenience template to zero out a struct. -template <class S> struct ZeroStruct : public S { - ZeroStruct() { memset(this, 0, sizeof(*this)); } -}; - namespace qpid { namespace sys { +// ================================================================ +// Private class declarations + +namespace { + +typedef enum { IN, OUT } Direction; +typedef std::pair<Event*, Event*> EventPair; +} // namespace + /** - * EventHandler wraps an epoll file descriptor. Acts as private - * interface between EventChannel and subclasses. - * - * Also implements Event interface for events that are not associated - * with a file descriptor and are passed via the message queue. - */ -class EventHandler : public Event, private Monitor + * Queue of events corresponding to one IO direction (IN or OUT). + * Each Descriptor contains two Queues. + */ +class EventChannel::Queue : private boost::noncopyable { public: - EventHandler(int epollSize = 256); - ~EventHandler(); + Queue(Descriptor& container, Direction dir); - int getEpollFd() { return epollFd; } - void epollAdd(int fd, uint32_t epollEvents, Event* event); - void epollMod(int fd, uint32_t epollEvents, Event* event); - void epollDel(int fd); + /** Called by Event classes in prepare() */ + void push(Event* e); - void mqPut(Event* event); - Event* mqGet(); - - protected: - // Should never be called, only complete. - void prepare(EventHandler&) { assert(0); } - Event* complete(EventHandler& eh); + /** Called when epoll wakes. + *@return The next completed event or 0. + */ + Event* wake(uint32_t epollFlags); + + void setBit(uint32_t &epollFlags); + + void shutdown(); private: + typedef std::deque<Event*> EventQ; + + inline bool isMyEvent(uint32_t flags) { return flags | myEvent; } + + Mutex& lock; // Shared with Descriptor. + Descriptor& descriptor; + uint32_t myEvent; // Epoll event flag. + EventQ queue; +}; + + +/** + * Manages a file descriptor in an epoll set. + * + * Can be shutdown and re-activated for the same file descriptor. + */ +class EventChannel::Descriptor : private boost::noncopyable { + public: + Descriptor() : epollFd(-1), myFd(-1), + inQueue(*this, IN), outQueue(*this, OUT) {} + + void activate(int epollFd_, int myFd_); + + /** Epoll woke up for this descriptor. */ + EventPair wake(uint32_t epollEvents); + + /** Shut down: close and remove file descriptor. + * May be re-activated if fd is reused. + */ + void shutdown(); + + // TODO aconway 2006-12-18: Nasty. Need to clean up interaction. + void shutdownUnsafe(); + + bool isShutdown() { return epollFd == -1; } + + Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; } + + private: + void update(); + void epollCtl(int op, uint32_t events); + + Mutex lock; int epollFd; - std::string mqName; - int mqFd; - std::queue<Event*> mqEvents; + int myFd; + Queue inQueue, outQueue; + + friend class Queue; }; -EventHandler::EventHandler(int epollSize) -{ - epollFd = epoll_create(epollSize); - if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + +/** + * Holds the epoll fd, Descriptor map and dispatch queue. + * Most of the epoll work is done by the Descriptors. + */ +class EventChannel::Impl { + public: + Impl(int size = 256); - // Create a POSIX message queue for non-fd events. - // We write one byte and never read it is always ready for read - // when we add it to epoll. - // - ZeroStruct<struct mq_attr> attr; - attr.mq_maxmsg = 1; - attr.mq_msgsize = 1; - do { - char tmpnam[L_tmpnam]; - tmpnam_r(tmpnam); - mqName = tmpnam + 4; // Skip "tmp/" - mqFd = mq_open( - mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); - if (mqFd < 0) throw QPID_POSIX_ERROR(errno); - } while (mqFd == EEXIST); // Name already taken, try again. + ~Impl(); - static char zero = '\0'; - mq_send(mqFd, &zero, 1, 0); - epollAdd(mqFd, 0, this); + /** + * Registers fd if not already registered. + */ + Descriptor& getDescriptor(int fd); + + /** Wait for an event, return 0 on timeout */ + Event* wait(Time timeout); + + Queue& getDispatchQueue() { return *dispatchQueue; } + + private: + + typedef boost::ptr_map<int, Descriptor> DescriptorMap; + + Mutex lock; + int epollFd; + DescriptorMap descriptors; + int pipe[2]; + Queue* dispatchQueue; +}; + + + +// ================================================================ +// EventChannel::Queue::implementation. + +static const char* shutdownMsg = "Event queue shut down."; + +EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d), + myEvent(dir==IN ? EPOLLIN : EPOLLOUT) +{} + +void EventChannel::Queue::push(Event* e) { + Mutex::ScopedLock l(lock); + if (descriptor.isShutdown()) + THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg); + queue.push_back(e); + descriptor.update(); } -EventHandler::~EventHandler() { - mq_close(mqFd); - mq_unlink(mqName.c_str()); +void EventChannel::Queue::setBit(uint32_t &epollFlags) { + if (queue.empty()) + epollFlags &= ~myEvent; + else + epollFlags |= myEvent; } -void EventHandler::mqPut(Event* event) { - ScopedLock l(*this); - assert(event != 0); - mqEvents.push(event); - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +Event* EventChannel::Queue::wake(uint32_t epollFlags) { + // Called with lock held. + if (!queue.empty() && (isMyEvent(epollFlags))) { + Event* e = queue.front()->complete(descriptor); + if (e) { + queue.pop_front(); + return e; + } + } + return 0; +} + +void EventChannel::Queue::shutdown() { + // Mark all pending events with a shutdown exception. + // The server threads will remove and dispatch the events. + // + qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, QPID_ERROR_LOCATION); + for_each(queue.begin(), queue.end(), + boost::bind(&Event::setException, _1, ex)); } -Event* EventHandler::mqGet() { - ScopedLock l(*this); - if (mqEvents.empty()) - return 0; - Event* event = mqEvents.front(); - mqEvents.pop(); - if(!mqEvents.empty()) - epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); - return event; + +// ================================================================ +// Descriptor + + +void EventChannel::Descriptor::activate(int epollFd_, int myFd_) { + Mutex::ScopedLock l(lock); + assert(myFd < 0 || (myFd == myFd_)); // Can't change fd. + if (epollFd < 0) { // Means we're not polling. + epollFd = epollFd_; + myFd = myFd_; + epollCtl(EPOLL_CTL_ADD, 0); + } } -void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) -{ - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) +void EventChannel::Descriptor::shutdown() { + Mutex::ScopedLock l(lock); + shutdownUnsafe(); +} + +void EventChannel::Descriptor::shutdownUnsafe() { + // Caller holds lock. + ::close(myFd); + epollFd = -1; // Indicate we are not polling. + inQueue.shutdown(); + outQueue.shutdown(); + epollCtl(EPOLL_CTL_DEL, 0); +} + +void EventChannel::Descriptor::update() { + // Caller holds lock. + uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP; + inQueue.setBit(events); + outQueue.setBit(events); + epollCtl(EPOLL_CTL_MOD, events); +} + +void EventChannel::Descriptor::epollCtl(int op, uint32_t events) { + // Caller holds lock + assert(!isShutdown()); + struct epoll_event ee; + memset(&ee, 0, sizeof(ee)); + ee.data.ptr = this; + ee.events = events; + int status = ::epoll_ctl(epollFd, op, myFd, &ee); + if (status < 0) throw QPID_POSIX_ERROR(errno); + } +} + + +EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) { + Mutex::ScopedLock l(lock); + cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl; + // If we have an error: + if (epollEvents & (EPOLLERR | EPOLLHUP)) { + shutdownUnsafe(); + // Complete both sides on error so the event can fail and + // mark itself with an exception. + epollEvents |= EPOLLIN | EPOLLOUT; + } + EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents)); + update(); + return ready; } -void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) + +// ================================================================ +// EventChannel::Impl + + +EventChannel::Impl::Impl(int epollSize): + epollFd(-1), dispatchQueue(0) { - ZeroStruct<struct epoll_event> ee; - ee.data.ptr = event; - ee.events = epollEvents; - if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) - throw QPID_POSIX_ERROR(errno); + // Create the epoll file descriptor. + epollFd = epoll_create(epollSize); + QPID_POSIX_CHECK(epollFd); + + // Create a pipe and write a single byte. The byte is never + // read so the pipes read fd is always ready for read. + // We activate the FD when there are messages in the queue. + QPID_POSIX_CHECK(::pipe(pipe)); + static char zero = '\0'; + QPID_POSIX_CHECK(::write(pipe[1], &zero, 1)); + dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN); } -void EventHandler::epollDel(int fd) { - if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) - throw QPID_POSIX_ERROR(errno); +EventChannel::Impl::~Impl() { + close(epollFd); + close(pipe[0]); + close(pipe[1]); } -Event* EventHandler::complete(EventHandler& eh) + +/** + * Wait for epoll to wake up, return the descriptor or 0 on timeout. + */ +Event* EventChannel::Impl::wait(Time timeoutNs) { - assert(&eh == this); - Event* event = mqGet(); - return event==0 ? 0 : event->complete(eh); + // No lock, all thread safe calls or local variables: + // + const long timeoutMs = + (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC; + struct epoll_event ee; + Event* event = 0; + bool doSwap = true; + + // Loop till we get a completed event. Some events may repost + // themselves and return 0, e.g. incomplete read or write events. + // + while (!event) { + int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe. + if (n == 0) // Timeout + return 0; + if (n < 0 && errno != EINTR) // Interrupt, ignore it. + continue; + if (n < 0) + throw QPID_POSIX_ERROR(errno); + assert(n == 1); + Descriptor* ed = + reinterpret_cast<Descriptor*>(ee.data.ptr); + assert(ed); + EventPair ready = ed->wake(ee.events); + + // We can only return one event so if both completed push one + // onto the dispatch queue to be dispatched in another thread. + if (ready.first && ready.second) { + // Keep it fair: in & out take turns to be returned first. + if (doSwap) + swap(ready.first, ready.second); + doSwap = !doSwap; + event = ready.first; + dispatchQueue->push(ready.second); + } + else { + event = ready.first ? ready.first : ready.second; + } + } + return event; } - + +EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) { + Mutex::ScopedLock l(lock); + Descriptor& ed = descriptors[fd]; + ed.activate(epollFd, fd); + return ed; +} + + // ================================================================ // EventChannel @@ -168,157 +367,138 @@ EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } -EventChannel::EventChannel() : handler(new EventHandler()) {} +EventChannel::EventChannel() : impl(new EventChannel::Impl()) {} EventChannel::~EventChannel() {} -void EventChannel::postEvent(Event& e) +void EventChannel::post(Event& e) { - e.prepare(*handler); + e.prepare(*impl); } -Event* EventChannel::getEvent() -{ - static const int infiniteTimeout = -1; - ZeroStruct<struct epoll_event> epollEvent; - - // Loop until we can complete the event. Some events may re-post - // themselves and return 0 from complete, e.g. partial reads. // - Event* event = 0; - while (event == 0) { - int eventCount = epoll_wait(handler->getEpollFd(), - &epollEvent, 1, infiniteTimeout); - if (eventCount < 0) { - if (errno != EINTR) { - // TODO aconway 2006-11-28: Proper handling/logging of errors. - cerr << BOOST_CURRENT_FUNCTION << " ignoring error " - << PosixError::getMessage(errno) << endl; - assert(0); - } - } - else if (eventCount == 1) { - event = reinterpret_cast<Event*>(epollEvent.data.ptr); - assert(event != 0); - try { - event = event->complete(*handler); - } - catch (const Exception& e) { - if (event) - event->setError(e); - } - catch (const std::exception& e) { - if (event) - event->setError(e); - } - } - } - return event; +void EventChannel::post(Event* e) { + assert(e); + post(*e); } -Event::~Event() {} - -void Event::prepare(EventHandler& handler) +Event* EventChannel::wait(Time timeoutNs) { - handler.mqPut(this); + return impl->wait(timeoutNs); } -bool Event::hasError() const { - return error; -} -void Event::throwIfError() throw (Exception) { - if (hasError()) - error.throwSelf(); +// ================================================================ +// Event and subclasses. + +Event::~Event() {} + +Exception::shared_ptr_const Event::getException() const { + return exception; } -Event* Event::complete(EventHandler&) -{ - return this; +void Event::throwIfException() { + if (getException()) + exception->throwSelf(); } void Event::dispatch() { + if (!callback.empty()) + callback(); +} + +void Event::setException(const std::exception& e) { + const Exception* ex = dynamic_cast<const Exception*>(&e); + if (ex) + exception.reset(ex->clone().release()); + else + exception.reset(new Exception(e)); +#ifndef NDEBUG + // Throw and re-catch the exception. Has no effect on the + // program but it triggers debuggers watching for throw. The + // context that sets the exception is more informative for + // debugging purposes than the one that ultimately throws it. + // try { - if (!callback.empty()) - callback(); - } catch (const std::exception&) { - throw; - } catch (...) { - throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + throwIfException(); } + catch (...) { } // Ignored. +#endif } -void Event::setError(const ExceptionHolder& e) { - error = e; + +void ReadEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(IN).push(this); } -void ReadEvent::prepare(EventHandler& handler) +Event* ReadEvent::complete(EventChannel::Descriptor& ed) { - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); + ssize_t n = ::read(descriptor, + static_cast<char*>(buffer) + bytesRead, + size - bytesRead); + + if (n < 0 && errno != EAGAIN) { // Error + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. + } + else if (n == 0) { // End of file + // TODO aconway 2006-12-13: Don't treat EOF as exception + // unless we're partway thru a !noWait read. + setException(QPID_POSIX_ERROR(ENODATA)); + ed.shutdownUnsafe(); // Called with lock held. + } + else { + if (n > 0) // possible that n < 0 && errno == EAGAIN + bytesRead += n; + if (bytesRead < size && !noWait) { + // Continue reading, not enough data. + return 0; + } + } + return this; } -ssize_t ReadEvent::doRead() { - ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, - size - received); - if (n > 0) received += n; - return n; + +void WriteEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(OUT).push(this); } -Event* ReadEvent::complete(EventHandler& handler) + +Event* WriteEvent::complete(EventChannel::Descriptor& ed) { - // Read as much as possible without blocking. - ssize_t n = doRead(); - while (n > 0 && received < size) doRead(); - - if (received == size) { - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - return this; - } - else if (n <0 && (errno == EAGAIN)) { - // Keep polling for more. - handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + ssize_t n = ::write(descriptor, + static_cast<const char*>(buffer) + bytesWritten, + size - bytesWritten); + if(n < 0 && errno == EAGAIN && noWait) { return 0; } - else { - // Unexpected EOF or error. Throw ENODATA for EOF. - handler.epollDel(descriptor); - received = 0; // Reset for re-use. - throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + if (n < 0 || (bytesWritten += n) < size) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } + return this; } -void WriteEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +void AcceptEvent::prepare(EventChannel::Impl& impl) { + impl.getDescriptor(descriptor).getQueue(IN).push(this); } -Event* WriteEvent::complete(EventHandler& handler) +Event* AcceptEvent::complete(EventChannel::Descriptor& ed) { - ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, - size - written); - if (n < 0) throw QPID_POSIX_ERROR(errno); - written += n; - if(written < size) { - // Keep polling. - handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); - return 0; + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) { + setException(QPID_POSIX_ERROR(errno)); + ed.shutdownUnsafe(); // Called with lock held. } - written = 0; // Reset for re-use. - handler.epollDel(descriptor); return this; } -void AcceptEvent::prepare(EventHandler& handler) -{ - handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +void DispatchEvent::prepare(EventChannel::Impl& impl) { + impl.getDispatchQueue().push(this); } -Event* AcceptEvent::complete(EventHandler& handler) +Event* DispatchEvent::complete(EventChannel::Descriptor&) { - handler.epollDel(descriptor); - accepted = ::accept(descriptor, 0, 0); - if (accepted < 0) throw QPID_POSIX_ERROR(errno); return this; } diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h index 49c7fce740..60c4026fbc 100644 --- a/cpp/lib/common/sys/posix/EventChannel.h +++ b/cpp/lib/common/sys/posix/EventChannel.h @@ -19,8 +19,11 @@ * */ -#include <SharedObject.h> -#include <ExceptionHolder.h> +#include "SharedObject.h" +#include "Exception.h" +#include "sys/Monitor.h" +#include "sys/Time.h" + #include <boost/function.hpp> #include <memory> @@ -28,11 +31,47 @@ namespace qpid { namespace sys { class Event; -class EventHandler; -class EventChannel; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static shared_ptr create(); + + ~EventChannel(); + + /** Post an event to the channel. */ + void post(Event& event); + + /** Post an event to the channel. Must not be 0. */ + void post(Event* event); + + /** + * Wait for the next complete event, up to timeout. + *@return Pointer to event or 0 if timeout elapses. + */ + Event* wait(Time timeout = TIME_INFINITE); + + class Impl; + class Queue; + class Descriptor; + + private: + + EventChannel(); + + Mutex lock; + boost::shared_ptr<Impl> impl; +}; /** * Base class for all Events. + * Derived classes define events representing various async IO operations. + * When an event is complete, it is returned by the EventChannel to + * a thread calling wait. The thread will call Event::dispatch() to + * execute code associated with event completion. */ class Event { @@ -40,135 +79,137 @@ class Event /** Type for callback when event is dispatched */ typedef boost::function0<void> Callback; - /** - * Create an event with optional callback. - * Instances of Event are sent directly through the channel. - * Derived classes define additional waiting behaviour. - *@param cb A callback functor that is invoked when dispatch() is called. - */ - Event(Callback cb = 0) : callback(cb) {} - virtual ~Event(); /** Call the callback provided to the constructor, if any. */ void dispatch(); - /** True if there was an error processing this event */ - bool hasError() const; + /** + *If there was an exception processing this Event, return it. + *@return 0 if there was no exception. Caller must not delete. + */ + qpid::Exception::shared_ptr_const getException() const; + + /** If getException() throw the corresponding exception. */ + void throwIfException(); + + /** Set the dispatch callback. */ + void setCallback(Callback cb) { callback = cb; } - /** If hasError() throw the corresponding exception. */ - void throwIfError() throw(Exception); + /** Set the exception. */ + void setException(const std::exception& e); protected: - virtual void prepare(EventHandler&); - virtual Event* complete(EventHandler&); - void setError(const ExceptionHolder& e); + Event(Callback cb=0) : callback(cb) {} + + virtual void prepare(EventChannel::Impl&) = 0; + virtual Event* complete(EventChannel::Descriptor&) = 0; Callback callback; - ExceptionHolder error; + Exception::shared_ptr_const exception; friend class EventChannel; - friend class EventHandler; + friend class EventChannel::Queue; }; -template <class BufT> -class IOEvent : public Event { +/** + * An event that does not wait for anything, it is processed + * immediately by one of the channel threads. + */ +class DispatchEvent : public Event { public: - void getDescriptor() const { return descriptor; } - size_t getSize() const { return size; } - BufT getBuffer() const { return buffer; } - + DispatchEvent(Callback cb=0) : Event(cb) {} + protected: - IOEvent(int fd, Callback cb, size_t sz, BufT buf) : - Event(cb), descriptor(fd), buffer(buf), size(sz) {} + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); +}; + +// Utility base class. +class FDEvent : public Event { + public: + int getDescriptor() const { return descriptor; } + protected: + FDEvent(Callback cb = 0, int fd = 0) + : Event(cb), descriptor(fd) {} int descriptor; - BufT buffer; - size_t size; }; +// Utility base class +class IOEvent : public FDEvent { + public: + size_t getSize() const { return size; } + + protected: + IOEvent(Callback cb, int fd, size_t sz, bool noWait_) : + FDEvent(cb, fd), size(sz), noWait(noWait_) {} + + size_t size; + bool noWait; +}; + /** Asynchronous read event */ -class ReadEvent : public IOEvent<void*> +class ReadEvent : public IOEvent { public: - explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<void*>(fd, cb, sz, buf), received(0) {} + explicit ReadEvent( + int fd=-1, void* buf=0, size_t sz=0, + Callback cb=0, bool noWait=false + ) : IOEvent(cb, fd, sz, noWait), buffer(buf), bytesRead(0) {} + void* getBuffer() const { return buffer; } + size_t getBytesRead() const { return bytesRead; } + private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); ssize_t doRead(); - size_t received; + void* buffer; + size_t bytesRead; }; /** Asynchronous write event */ -class WriteEvent : public IOEvent<const void*> +class WriteEvent : public IOEvent { public: explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, Callback cb=0) : - IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + IOEvent(cb, fd, sz, noWait), buffer(buf), bytesWritten(0) {} - protected: - void prepare(EventHandler&); - Event* complete(EventHandler&); + const void* getBuffer() const { return buffer; } + size_t getBytesWritten() const { return bytesWritten; } private: + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); ssize_t doWrite(); - size_t written; + + const void* buffer; + size_t bytesWritten; }; + /** Asynchronous socket accept event */ -class AcceptEvent : public Event +class AcceptEvent : public FDEvent { public: /** Accept a connection on fd. */ explicit AcceptEvent(int fd=-1, Callback cb=0) : - Event(cb), descriptor(fd), accepted(0) {} - - /** Get descriptor for server socket */ + FDEvent(cb, fd), accepted(0) {} + + /** Get descriptor for accepted server socket */ int getAcceptedDesscriptor() const { return accepted; } private: - void prepare(EventHandler&); - Event* complete(EventHandler&); + void prepare(EventChannel::Impl&); + Event* complete(EventChannel::Descriptor&); - int descriptor; int accepted; }; -class QueueSet; - -/** - * Channel to post and wait for events. - */ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static shared_ptr create(); - - ~EventChannel(); - - /** Post an event to the channel. */ - void postEvent(Event& event); - - /** Post an event to the channel. Must not be 0. */ - void postEvent(Event* event) { postEvent(*event); } - - /** - * Wait for the next complete event. - *@return Pointer to event. Will never return 0. - */ - Event* getEvent(); - - private: - EventChannel(); - boost::shared_ptr<EventHandler> handler; -}; - - }} diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp index 7cd6f60902..28f9beb44e 100644 --- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp +++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp @@ -139,11 +139,11 @@ void EventChannelAcceptor::accept() shutdown(); return; } - // TODO aconway 2006-11-29: Need to reap closed connections also. int fd = acceptEvent.getAcceptedDesscriptor(); + threads->post(acceptEvent); // Keep accepting. + // TODO aconway 2006-11-29: Need to reap closed connections also. connections.push_back( new EventChannelConnection(threads, *factory, fd, fd, isTrace)); - threads->post(acceptEvent); // Keep accepting. } }} // namespace qpid::sys diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp index 95e699e0b0..787da72ffa 100644 --- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp +++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp @@ -16,26 +16,40 @@ * */ -#include "EventChannelThreads.h" -#include <sys/Runnable.h> #include <iostream> -using namespace std; +#include <limits> + #include <boost/bind.hpp> +#include <sys/Runnable.h> + +#include "EventChannelThreads.h" + namespace qpid { namespace sys { +const size_t EventChannelThreads::unlimited = + std::numeric_limits<size_t>::max(); + EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) + EventChannel::shared_ptr ec, size_t min, size_t max +) { - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); + return EventChannelThreads::shared_ptr( + new EventChannelThreads(ec, min, max)); } -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) +EventChannelThreads::EventChannelThreads( + EventChannel::shared_ptr ec, size_t min, size_t max) : + minThreads(std::max(size_t(1), min)), + maxThreads(std::min(min, max)), + channel(ec), + nWaiting(0), + state(RUNNING) { - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); + Monitor::ScopedLock l(monitor); + while (workers.size() < minThreads) + workers.push_back(Thread(*this)); } EventChannelThreads::~EventChannelThreads() { @@ -43,34 +57,37 @@ EventChannelThreads::~EventChannelThreads() { join(); } +// Termination marker event. +static DispatchEvent terminate; + void EventChannelThreads::shutdown() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; + state = TERMINATING; for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); + channel->post(terminate); } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. + monitor.notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); while (state == RUNNING) // Wait for shutdown to start. - wait(); + monitor.wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) - wait(); + monitor.wait(); return; } // I'm the joining thread - assert(state == TERMINATE_SENT); + assert(state == TERMINATING); state = JOINING; } // Drop the lock. @@ -79,12 +96,13 @@ void EventChannelThreads::join() workers[i].join(); } state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. + monitor.notifyAll(); // Notify any other join() threads. } void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); + Monitor::ScopedLock l(monitor); + if (workers.size() < maxThreads) + workers.push_back(Thread(*this)); } void EventChannelThreads::run() @@ -93,26 +111,22 @@ void EventChannelThreads::run() AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { - Event* e = channel->getEvent(); + Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) { + if (e == &terminate) return; - } AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) + // Make sure there's at least one waiting thread. + if (dec == 0 && state == RUNNING) addThread(); e->dispatch(); } } catch (const std::exception& e) { - // TODO aconway 2006-11-15: need better logging across the board. - std::cerr << "EventChannelThreads::run() caught: " << e.what() - << std::endl; + Exception::log(e, "Exception in EventChannelThreads::run()"); } catch (...) { - std::cerr << "EventChannelThreads::run() caught unknown exception." - << std::endl; + Exception::logUnknown("Exception in EventChannelThreads::run()"); } } diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h index 98403c0869..721a5e9d24 100644 --- a/cpp/lib/common/sys/posix/EventChannelThreads.h +++ b/cpp/lib/common/sys/posix/EventChannelThreads.h @@ -20,11 +20,12 @@ */ #include <vector> -#include <Exception.h> -#include <sys/Time.h> -#include <sys/Monitor.h> -#include <sys/Thread.h> -#include <sys/AtomicCount.h> +#include "Exception.h" +#include "sys/AtomicCount.h" +#include "sys/Monitor.h" +#include "sys/Thread.h" +#include "sys/Time.h" + #include "EventChannel.h" namespace qpid { @@ -33,26 +34,36 @@ namespace sys { /** Dynamic thread pool serving an EventChannel. - Threads run a loop { e = getEvent(); e->dispatch(); } + Threads run a loop { e = wait(); e->dispatch(); } The size of the thread pool is automatically adjusted to optimal size. */ class EventChannelThreads : public qpid::SharedObject<EventChannelThreads>, - public sys::Monitor, private sys::Runnable + private sys::Runnable { public: - /** Create the thread pool and start initial threads. */ + /** Constant to represent an unlimited number of threads */ + static const size_t unlimited; + + /** + * Create the thread pool and start initial threads. + * @param minThreads Pool will initialy contain minThreads threads and + * will never shrink to less until shutdown. + * @param maxThreads Pool will never grow to more than maxThreads. + */ static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel + EventChannel::shared_ptr channel = EventChannel::create(), + size_t minThreads = 1, + size_t maxThreads = unlimited ); ~EventChannelThreads(); /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } + void post(Event& event) { channel->post(event); } /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } + void post(Event* event) { channel->post(event); } /** * Terminate all threads. @@ -68,21 +79,25 @@ class EventChannelThreads : private: typedef std::vector<sys::Thread> Threads; typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + RUNNING, TERMINATING, JOINING, SHUTDOWN } State; - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + EventChannelThreads( + EventChannel::shared_ptr channel, size_t min, size_t max); + void addThread(); void run(); bool keepRunning(); void adjustThreads(); + Monitor monitor; + size_t minThreads; + size_t maxThreads; EventChannel::shared_ptr channel; Threads workers; sys::AtomicCount nWaiting; State state; - Event terminate; }; diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp index 842aa76f36..e69de29bb2 100644 --- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp +++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <sys/Acceptor.h> -#include <Exception.h> - -namespace qpid { -namespace sys { - -namespace { -void fail() { throw qpid::Exception("PosixAcceptor not implemented"); } -} - -class PosixAcceptor : public Acceptor { - public: - virtual int16_t getPort() const { fail(); return 0; } - virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); } - virtual void shutdown() { fail(); } -}; - -// Define generic Acceptor::create() to return APRAcceptor. - Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool) -{ - return Acceptor::shared_ptr(new PosixAcceptor()); -} - -// Must define Acceptor virtual dtor. -Acceptor::~Acceptor() {} - -}} diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp index 5bd13742f6..fc82b4e7e5 100644 --- a/cpp/lib/common/sys/posix/Socket.cpp +++ b/cpp/lib/common/sys/posix/Socket.cpp @@ -96,6 +96,8 @@ Socket::recv(void* data, size_t size) int Socket::listen(int port, int backlog) { struct sockaddr_in name; + static const int ON = 1; + setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &ON, sizeof(ON)); name.sin_family = AF_INET; name.sin_port = htons(port); name.sin_addr.s_addr = 0; @@ -111,8 +113,15 @@ int Socket::listen(int port, int backlog) return ntohs(name.sin_port); } +Socket Socket::accept() { + int accepted = ::accept(socket, 0, 0); + if (accepted < 0) + throw (QPID_POSIX_ERROR(errno)); + return Socket(accepted); +} + -int Socket::fd() +int Socket::fd()const { return socket; } diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp index 408679caa8..4ddacb3fbd 100644 --- a/cpp/lib/common/sys/posix/check.cpp +++ b/cpp/lib/common/sys/posix/check.cpp @@ -32,8 +32,8 @@ PosixError::getMessage(int errNo) return std::string(strerror_r(errNo, buf, sizeof(buf))); } -PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() - : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) +PosixError::PosixError(int errNo, const qpid::QpidError::Location& l) throw() + : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), l) { } }} diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h index 5afbe8f5a8..052fb08580 100644 --- a/cpp/lib/common/sys/posix/check.h +++ b/cpp/lib/common/sys/posix/check.h @@ -37,13 +37,15 @@ class PosixError : public qpid::QpidError public: static std::string getMessage(int errNo); - PosixError(int errNo, const qpid::SrcLine& location) throw(); + PosixError(int errNo, const qpid::QpidError::Location& location) throw(); ~PosixError() throw() {} int getErrNo() { return errNo; } - Exception* clone() const throw() { return new PosixError(*this); } + Exception::auto_ptr clone() const throw() { + return Exception::auto_ptr(new PosixError(*this)); + } void throwSelf() { throw *this; } @@ -54,9 +56,17 @@ class PosixError : public qpid::QpidError }} /** Create a PosixError for the current file/line and errno. */ -#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +#define QPID_POSIX_ERROR(ERRNO) \ + ::qpid::sys::PosixError((ERRNO), QPID_ERROR_LOCATION) -/** Throw a posix error if errNo is non-zero */ -#define QPID_POSIX_THROW_IF(ERRNO) \ - if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) +/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +#define QPID_POSIX_CHECK(RESULT) \ + if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) + +/** Throw QPID_POSIX_ERROR(ERRNO) if ERRNO is non zero */ +#define QPID_POSIX_THROW_IF(ERRNO) \ + do { int e = (ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0) + + + #endif /*!_posix_check_h*/ diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp index 8e5c724a15..67b8b03ce2 100644 --- a/cpp/tests/EventChannelTest.cpp +++ b/cpp/tests/EventChannelTest.cpp @@ -47,8 +47,9 @@ struct RunMe : public Runnable class EventChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(EventChannelTest); - CPPUNIT_TEST(testEvent); + CPPUNIT_TEST(testDispatch); CPPUNIT_TEST(testRead); + CPPUNIT_TEST(testPartialRead); CPPUNIT_TEST(testFailedRead); CPPUNIT_TEST(testWrite); CPPUNIT_TEST(testFailedWrite); @@ -72,26 +73,26 @@ class EventChannelTest : public CppUnit::TestCase signal(SIGPIPE, SIG_IGN); } - // Verify that calling getEvent returns event. + // Verify that calling wait returns event. template <class T> bool isNextEvent(T& event) { - return &event == dynamic_cast<T*>(ec->getEvent()); + return &event == dynamic_cast<T*>(ec->wait(5*TIME_SEC)); } template <class T> bool isNextEventOk(T& event) { - Event* next = ec->getEvent(); - if (next) next->throwIfError(); + Event* next = ec->wait(TIME_SEC); + if (next) next->throwIfException(); return &event == next; } - void testEvent() + void testDispatch() { RunMe runMe; CPPUNIT_ASSERT(!runMe.ran); // Instances of Event just pass thru the channel immediately. - Event e(runMe.functor()); - ec->postEvent(e); + DispatchEvent e(runMe.functor()); + ec->post(e); CPPUNIT_ASSERT(isNextEventOk(e)); e.dispatch(); CPPUNIT_ASSERT(runMe.ran); @@ -99,42 +100,64 @@ class EventChannelTest : public CppUnit::TestCase void testRead() { ReadEvent re(pipe[0], readBuf, size); - ec->postEvent(re); + ec->post(re); CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size)); CPPUNIT_ASSERT(isNextEventOk(re)); - CPPUNIT_ASSERT_EQUAL(size, re.getSize()); + CPPUNIT_ASSERT_EQUAL(size, re.getBytesRead()); CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); } + void testPartialRead() { + ReadEvent re(pipe[0], readBuf, size, 0, true); + ec->post(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size/2), ::write(pipe[1], hello, size/2)); + CPPUNIT_ASSERT(isNextEventOk(re)); + CPPUNIT_ASSERT_EQUAL(size/2, re.getBytesRead()); + CPPUNIT_ASSERT_EQUAL(std::string(hello, size/2), + std::string(readBuf, size/2)); + } + + void testFailedRead() { ReadEvent re(pipe[0], readBuf, size); - ec->postEvent(re); + ec->post(re); // EOF before all data read. ::close(pipe[1]); CPPUNIT_ASSERT(isNextEvent(re)); - CPPUNIT_ASSERT(re.hasError()); + CPPUNIT_ASSERT(re.getException()); try { - re.throwIfError(); + re.throwIfException(); CPPUNIT_FAIL("Expected QpidError."); } catch (const qpid::QpidError&) { } + + // Try to read from closed file descriptor. + try { + ec->post(re); + CPPUNIT_ASSERT(isNextEvent(re)); + re.throwIfException(); + CPPUNIT_FAIL("Expected an exception."); + } + catch (const qpid::QpidError&) {} + // Bad file descriptor. Note in this case we fail - // in postEvent and throw immediately. + // in post and throw immediately. try { - ReadEvent bad; - ec->postEvent(bad); + ReadEvent bad(-1, readBuf, size); + ec->post(bad); CPPUNIT_FAIL("Expected QpidError."); } - catch (const qpid::QpidError&) { } + catch (const qpid::QpidError&) {} } void testWrite() { WriteEvent wr(pipe[1], hello, size); - ec->postEvent(wr); + ec->post(wr); CPPUNIT_ASSERT(isNextEventOk(wr)); + CPPUNIT_ASSERT_EQUAL(size, wr.getBytesWritten()); CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));; CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); } @@ -142,43 +165,55 @@ class EventChannelTest : public CppUnit::TestCase void testFailedWrite() { WriteEvent wr(pipe[1], hello, size); ::close(pipe[0]); - ec->postEvent(wr); + ec->post(wr); CPPUNIT_ASSERT(isNextEvent(wr)); - CPPUNIT_ASSERT(wr.hasError()); + CPPUNIT_ASSERT(wr.getException()); } void testReadWrite() { ReadEvent re(pipe[0], readBuf, size); WriteEvent wr(pipe[1], hello, size); - ec->postEvent(re); - ec->postEvent(wr); - ec->getEvent(); - ec->getEvent(); + ec->post(re); + ec->post(wr); + ec->wait(TIME_SEC); + ec->wait(TIME_SEC); CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); } - void testAccept() { - Socket s = Socket::createTcp(); - int port = s.listen(0, 10); - CPPUNIT_ASSERT(port != 0); - - AcceptEvent ae(s.fd()); - ec->postEvent(ae); - Socket client = Socket::createTcp(); + void connectSendRead(AcceptEvent& ae, int port, Socket client) + { + ec->post(ae); + // Connect a client, send some data, read the data. client.connect("localhost", port); CPPUNIT_ASSERT(isNextEvent(ae)); - ae.dispatch(); + ae.throwIfException(); - // Verify client writes are read by the accepted descriptor. char readBuf[size]; ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size); - ec->postEvent(re); - CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello))); + ec->post(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), + client.send(hello, sizeof(hello))); CPPUNIT_ASSERT(isNextEvent(re)); - re.dispatch(); + re.throwIfException(); CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); } + + void testAccept() { + Socket s = Socket::createTcp(); + int port = s.listen(0, 10); + CPPUNIT_ASSERT(port != 0); + + AcceptEvent ae(s.fd()); + Socket client = Socket::createTcp(); + connectSendRead(ae, port, client); + Socket client2 = Socket::createTcp(); + connectSendRead(ae, port, client2); + client.close(); + client2.close(); + Socket client3 = Socket::createTcp(); + connectSendRead(ae, port, client3); + } }; // Make this test suite a plugin. diff --git a/cpp/tests/EventChannelThreadsTest.cpp b/cpp/tests/EventChannelThreadsTest.cpp index 285ed29518..f8b4ad6f4f 100644 --- a/cpp/tests/EventChannelThreadsTest.cpp +++ b/cpp/tests/EventChannelThreadsTest.cpp @@ -42,7 +42,7 @@ const int totalEvents = nConnections+2*nConnections*nMessages; * We count the total number of events, and the * number of reads and writes for each message number. */ -class TestResults : public Monitor { +class TestResults { public: TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {} @@ -62,20 +62,21 @@ class TestResults : public Monitor { } void shutdown(const std::string& exceptionMsg = std::string()) { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); exception = exceptionMsg; isShutdown = true; - notifyAll(); + monitor.notifyAll(); } void wait() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); Time deadline = now() + 10*TIME_SEC; while (!isShutdown) { - CPPUNIT_ASSERT(Monitor::wait(deadline)); + CPPUNIT_ASSERT(monitor.wait(deadline)); } } + Monitor monitor; bool isShutdown; std::string exception; AtomicCount reads[nMessages]; @@ -113,30 +114,34 @@ class SafeCallback { }; /** Repost an event N times. */ -class Repost { +template <class T> +class Reposter { public: - Repost(int n) : count (n) {} - virtual ~Repost() {} + Reposter(T* event_, int n) : event(event_), original(*event_), count (n) {} + virtual ~Reposter() {} - void repost(Event* event) { + void repost() { if (--count==0) { delete event; } else { - threads->postEvent(event); + *event = original; + threads->post(event); } } private: + T* event; + T original; int count; }; /** Repeating read event. */ -class TestReadEvent : public ReadEvent, public Runnable, private Repost { +class TestReadEvent : public ReadEvent, public Runnable { public: explicit TestReadEvent(int fd=-1) : ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)), - Repost(nMessages) + reposter(this, nMessages) {} void run() { @@ -144,47 +149,50 @@ class TestReadEvent : public ReadEvent, public Runnable, private Repost { CPPUNIT_ASSERT(0 <= value); CPPUNIT_ASSERT(value < nMessages); results.countRead(value); - repost(this); + reposter.repost(); } private: int value; - ReadEvent original; + Reposter<ReadEvent> reposter; }; /** Fire and forget write event */ -class TestWriteEvent : public WriteEvent, public Runnable, private Repost { +class TestWriteEvent : public WriteEvent, public Runnable { public: TestWriteEvent(int fd=-1) : WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)), - Repost(nMessages), + reposter(this, nMessages), value(0) {} void run() { CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize()); results.countWrite(value++); - repost(this); + reposter.repost(); } private: + Reposter<WriteEvent> reposter; int value; }; /** Fire-and-forget Accept event, posts reads on the accepted connection. */ -class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost { +class TestAcceptEvent : public AcceptEvent, public Runnable { public: TestAcceptEvent(int fd=-1) : AcceptEvent(fd, SafeCallback(*this)), - Repost(nConnections) + reposter(this, nConnections) {} void run() { - threads->postEvent(new TestReadEvent(getAcceptedDesscriptor())); + threads->post(new TestReadEvent(getAcceptedDesscriptor())); results.countEvent(); - repost(this); + reposter.repost(); } + private: + Reposter<AcceptEvent> reposter; }; class EventChannelThreadsTest : public CppUnit::TestCase @@ -207,10 +215,10 @@ class EventChannelThreadsTest : public CppUnit::TestCase { Socket listener = Socket::createTcp(); int port = listener.listen(); - + // Post looping accept events, will repost nConnections times. // The accept event will automatically post read events. - threads->postEvent(new TestAcceptEvent(listener.fd())); + threads->post(new TestAcceptEvent(listener.fd())); // Make connections. Socket connections[nConnections]; @@ -221,7 +229,7 @@ class EventChannelThreadsTest : public CppUnit::TestCase // Post looping write events. for (int i = 0; i < nConnections; ++i) { - threads->postEvent(new TestWriteEvent(connections[i].fd())); + threads->post(new TestWriteEvent(connections[i].fd())); } // Wait for all events to be dispatched. diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am index 900bf47960..943afde228 100644 --- a/cpp/tests/Makefile.am +++ b/cpp/tests/Makefile.am @@ -16,9 +16,10 @@ EXTRA_DIST = \ topicall \ topictest \ qpid_test_plugin.h \ - APRBaseTest.cpp + MockSessionHandler.h -client_tests = \ + +client_exe_tests = \ client_test \ echo_service \ topic_listener \ @@ -41,7 +42,8 @@ broker_tests = \ TxAckTest \ TxBufferTest \ TxPublishTest \ - ValueTest + ValueTest \ + AcceptorTest framing_tests = \ BodyHandlerTest \ @@ -54,17 +56,21 @@ misc_tests = \ posix_tests = \ EventChannelTest \ - EventChannelThreadsTest + EventChannelThreadsTest \ + EventChannelConnectionTest + +apr_tests = APRBaseTest.cpp unit_tests = \ $(broker_tests) \ $(framing_tests) \ - $(misc_tests) + $(misc_tests) \ + $(posix_tests) -noinst_PROGRAMS = $(client_tests) +noinst_PROGRAMS = $(client_exe_tests) -TESTS = run-unit-tests run-python-tests +TESTS = run-unit-tests run-system-tests EXTRA_DIST += $(TESTS) include gen.mk @@ -77,7 +83,7 @@ lib_broker = $(abs_builddir)/../lib/broker/libqpidbroker.la gen.mk: Makefile.am ( \ - for i in $(client_tests); do \ + for i in $(client_exe_tests); do \ echo $${i}_SOURCES = $$i.cpp; \ echo $${i}_LDADD = '$$(lib_client) $$(lib_common) $$(extra_libs)'; \ done; \ diff --git a/cpp/tests/run-python-tests b/cpp/tests/run-python-tests index 57be07ec1c..e69de29bb2 100755 --- a/cpp/tests/run-python-tests +++ b/cpp/tests/run-python-tests @@ -1,15 +0,0 @@ -#!/bin/sh - -set -e -log=`pwd`/qpidd.log -# Start the daemon, recording its PID. -../src/qpidd > $log 2>&1 & pid=$! - -# Arrange to kill the daemon upon any type of termination. -trap 'status=$?; kill $pid; exit $status' 0 -trap '(exit $?); exit $?' 1 2 13 15 - -# Run the tests. -cd ../../python && ./run-tests -v -I cpp_failing.txt - -rm -f $log diff --git a/cpp/tests/topic_publisher.cpp b/cpp/tests/topic_publisher.cpp index b95abd9d66..6d17b7034f 100644 --- a/cpp/tests/topic_publisher.cpp +++ b/cpp/tests/topic_publisher.cpp @@ -138,13 +138,15 @@ int main(int argc, char** argv){ int64_t sum(0); for(int i = 0; i < batchSize; i++){ if(i > 0 && args.getDelay()) sleep(args.getDelay()); - Time time = publisher.publish( - args.getMessages(), args.getSubscribers(), args.getSize()); - if(!max || time > max) max = time; - if(!min || time < min) min = time; - sum += time; + int64_t msecs = + publisher.publish(args.getMessages(), + args.getSubscribers(), + args.getSize()) / TIME_MSEC; + if(!max || msecs > max) max = msecs; + if(!min || msecs < min) min = msecs; + sum += msecs; std::cout << "Completed " << (i+1) << " of " << batchSize - << " in " << time/TIME_MSEC << "ms" << std::endl; + << " in " << msecs << "ms" << std::endl; } publisher.terminate(); int64_t avg = sum / batchSize; diff --git a/cpp/tests/topictest b/cpp/tests/topictest index 792f063bea..da3a0c1f92 100755 --- a/cpp/tests/topictest +++ b/cpp/tests/topictest @@ -1,42 +1,41 @@ #!/bin/bash -# Run the c++ or java topic test +# Run the c++ or topic test -. `dirname $0`/env - -# Edit parameters here: - -# Big test: -# LISTENERS=10 -# MESSAGES=10000 -# BATCHES=20 - -LISTENERS=10 +# Defaults +SUBSCRIBERS=10 MESSAGES=2000 BATCHES=10 -cppcmds() { - LISTEN_CMD=topic_listener - PUBLISH_CMD="topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $LISTENERS" -} +while getopts "s:m:b:" opt ; do + case $opt in + s) SUBSCRIBERS=$OPTARG ;; + m) MESSAGES=$OPTARG ;; + b) BATCHES=$OPTARG ;; + ?) + echo "Usage: %0 [-l <subscribers>] [-m <messages.] [-b <batches>]" + exit 1 + ;; + esac +done -javacmds() { - DEF=-Damqj.logging.level="error" - LISTEN_CMD="qpid-run $DEF org.apache.qpid.topic.Listener" - PUBLISH_CMD="qpid-run $DEF org.apache.qpid.topic.Publisher -messages $MESSAGES -batch $BATCHES -clients $LISTENERS" +subscribe() { + ID=$1 + echo "subscriber $ID" + ./topic_listener > subscriber.$ID 2>&1 || { + echo "SUBSCRIBER %ID FAILED: " ; + cat subscriber.$ID + } + rm subscriber.$ID } -case $1 in - c) cppcmds ;; - j) javacmds ;; - *) cppcmds ;; -esac +publish() { + ./topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $SUBSCRIBERS +} -for ((i=$LISTENERS ; i--; )); do - $LISTEN_CMD > /dev/null 2>&1 & +for ((i=$SUBSCRIBERS ; i--; )); do + subscribe $i & done sleep 1 -echo $PUBLISH_CMD $OPTIONS - STATS=~/bin/topictest.times echo "---- topictest `date`" >> $STATS -$PUBLISH_CMD $OPTIONS | tee -a $STATS +publish | tee -a $STATS |