From 013f077cade5451798b76c2912b12ec873b6177e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 20 Dec 2006 17:54:05 +0000 Subject: Current state of event channel code. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489159 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/configure.ac | 17 +- cpp/lib/broker/AutoDelete.cpp | 2 +- cpp/lib/broker/Broker.cpp | 2 +- cpp/lib/broker/BrokerChannel.cpp | 161 +++--- cpp/lib/broker/BrokerChannel.h | 4 +- cpp/lib/broker/BrokerMessage.cpp | 3 +- cpp/lib/broker/BrokerQueue.cpp | 3 +- cpp/lib/broker/TopicExchange.cpp | 5 +- cpp/lib/client/ResponseHandler.cpp | 19 +- cpp/lib/common/Exception.cpp | 21 +- cpp/lib/common/Exception.h | 64 ++- cpp/lib/common/ExceptionHolder.cpp | 32 -- cpp/lib/common/ExceptionHolder.h | 62 --- cpp/lib/common/Makefile.am | 32 +- cpp/lib/common/QpidError.cpp | 34 +- cpp/lib/common/QpidError.h | 36 +- cpp/lib/common/framing/AMQFrame.h | 4 +- cpp/lib/common/sys/Acceptor.h | 9 +- cpp/lib/common/sys/AtomicCount.h | 58 ++- cpp/lib/common/sys/Runnable.cpp | 4 + cpp/lib/common/sys/Runnable.h | 11 +- cpp/lib/common/sys/Socket.h | 5 +- cpp/lib/common/sys/Thread.h | 3 +- cpp/lib/common/sys/Time.h | 6 +- cpp/lib/common/sys/apr/APRAcceptor.cpp | 36 +- cpp/lib/common/sys/apr/LFProcessor.cpp | 2 - cpp/lib/common/sys/posix/EventChannel.cpp | 572 ++++++++++++++-------- cpp/lib/common/sys/posix/EventChannel.h | 201 +++++--- cpp/lib/common/sys/posix/EventChannelAcceptor.cpp | 4 +- cpp/lib/common/sys/posix/EventChannelThreads.cpp | 74 +-- cpp/lib/common/sys/posix/EventChannelThreads.h | 43 +- cpp/lib/common/sys/posix/PosixAcceptor.cpp | 48 -- cpp/lib/common/sys/posix/Socket.cpp | 11 +- cpp/lib/common/sys/posix/check.cpp | 4 +- cpp/lib/common/sys/posix/check.h | 22 +- cpp/tests/EventChannelTest.cpp | 109 +++-- cpp/tests/EventChannelThreadsTest.cpp | 56 ++- cpp/tests/Makefile.am | 22 +- cpp/tests/run-python-tests | 15 - cpp/tests/topic_publisher.cpp | 14 +- cpp/tests/topictest | 57 ++- 41 files changed, 1108 insertions(+), 779 deletions(-) diff --git a/cpp/configure.ac b/cpp/configure.ac index 0334b00fe9..b0947bd3b9 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -25,14 +25,6 @@ AC_USE_SYSTEM_EXTENSIONS AM_MISSING_PROG([HELP2MAN], [help2man]) -AC_ARG_ENABLE(warnings, -[ --enable-warnings turn on lots of compiler warnings (recommended)], -[case "${enableval}" in - yes|no) ;; - *) AC_MSG_ERROR([bad value ${enableval} for warnings option]) ;; - esac], - [enableval=yes]) - # Turn on this automake conditional if we are in a qpid # hierarchy (i.e. with gentools/ and specs/ sibling directories), # and if we have working java + javac. @@ -53,7 +45,12 @@ AM_CONDITIONAL([CAN_GENERATE_CODE], [test x$build = xyes]) # -Wunreachable-code -Wpadded -Winline # -Wshadow - warns about boost headers. -if test "${enableval}" = yes; then +AC_ARG_ENABLE(warnings, + [AS_HELP_STRING([--disable-warnings], + [Disable compiler warnings (default enabled)])], + [], [enable_warnings=yes]) + +if test "${enable_warnings}" = yes; then gl_COMPILER_FLAGS(-Werror) gl_COMPILER_FLAGS(-pedantic) gl_COMPILER_FLAGS(-Wall) @@ -106,7 +103,7 @@ AC_ARG_ENABLE(apr, yes|no) ;; *) AC_MSG_ERROR([invalid APR enable/disable value: $enable_APR]) ;; esac], -[enable_APR=yes]) +[enable_APR=no]) APR_MINIMUM_VERSION=1.2.2 AC_SUBST(APR_MINIMUM_VERSION) diff --git a/cpp/lib/broker/AutoDelete.cpp b/cpp/lib/broker/AutoDelete.cpp index ae48d10505..6d87c982b4 100644 --- a/cpp/lib/broker/AutoDelete.cpp +++ b/cpp/lib/broker/AutoDelete.cpp @@ -63,7 +63,7 @@ void AutoDelete::run(){ Monitor::ScopedLock l(monitor); while(!stopped){ process(); - monitor.wait(period*TIME_MSEC); + monitor.wait(now() + period*TIME_MSEC); } } diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 6c0d7a3f3f..e2265a292f 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -47,7 +47,7 @@ Broker::shared_ptr Broker::create(const Configuration& config) { } void Broker::run() { - acceptor->run(&factory); + acceptor->run(factory); } void Broker::shutdown() { diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 5d4f68a8af..979617b594 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -18,11 +18,15 @@ * under the License. * */ -#include -#include +#include + #include #include -#include + +#include + +#include "BrokerChannel.h" +#include "QpidError.h" using std::mem_fun_ref; using std::bind2nd; @@ -50,11 +54,18 @@ Channel::~Channel(){ } bool Channel::exists(const string& consumerTag){ + Mutex::ScopedLock l(lock); return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){ - if(tag.empty()) tag = tagGenerator.generate(); +void Channel::consume( + string& tag, Queue::shared_ptr queue, bool acks, + bool exclusive, ConnectionToken* const connection, const FieldTable*) +{ + Mutex::ScopedLock l(lock); + if(tag.empty()) tag = tagGenerator.generate(); + // TODO aconway 2006-12-13: enforce ownership of consumer + // with auto_ptr. ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception @@ -65,7 +76,8 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl } } -void Channel::cancel(consumer_iterator i){ +void Channel::cancel(consumer_iterator i) { + // Private, must be called with lock held. ConsumerImpl* c = i->second; consumers.erase(i); if(c){ @@ -75,6 +87,7 @@ void Channel::cancel(consumer_iterator i){ } void Channel::cancel(const string& tag){ + Mutex::ScopedLock l(lock); consumer_iterator i = consumers.find(tag); if(i != consumers.end()){ cancel(i); @@ -82,11 +95,14 @@ void Channel::cancel(const string& tag){ } void Channel::close(){ - //cancel all consumers - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - cancel(i); + { + Mutex::ScopedLock l(lock); + while(!consumers.empty()) { + cancel(consumers.begin()); + } } - //requeue: + // TODO aconway 2006-12-13: does recovery need to be atomic with + // cancelling all consumers? recover(true); } @@ -109,20 +125,21 @@ void Channel::rollback(){ } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ - Mutex::ScopedLock locker(deliveryLock); - - u_int64_t deliveryTag = currentDeliveryTag++; - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; + u_int64_t deliveryTag; + { + Mutex::ScopedLock l(lock); + deliveryTag = currentDeliveryTag++; + if(ackExpected){ + unacked.push_back( + DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; + } } - //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, deliveryTag, framesize); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; @@ -187,71 +204,99 @@ void Channel::complete(Message::shared_ptr& msg){ } exchange.reset(); }else{ - std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; + std::cout << "Exchange not known in" << BOOST_CURRENT_FUNCTION + << std::endl; } } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) { if(transactional){ + Mutex::ScopedLock locker(lock); accumulatedAck.update(deliveryTag, multiple); - //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: I think the outstanding prefetch size & count should + //be updated at this point... //TODO: ...this may then necessitate dispatching to consumers - }else{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); - if(i == unacked.end()){ - throw InvalidAckException(); - }else if(multiple){ - ack_iterator end = ++i; - for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); - unacked.erase(unacked.begin(), end); - - //recalculate the prefetch: - outstanding.reset(); - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); - }else{ - i->discard(); - i->subtractFrom(&outstanding); - unacked.erase(i); - } + } + else { + { + Mutex::ScopedLock locker(lock); + ack_iterator i = find_if( + unacked.begin(), unacked.end(), + boost::bind(&DeliveryRecord::matches, _1, deliveryTag)); + if(i == unacked.end()) { + throw InvalidAckException(); + } + else if(multiple) { + ack_iterator end = ++i; + for_each(unacked.begin(), end, + mem_fun_ref(&DeliveryRecord::discard)); + unacked.erase(unacked.begin(), end); + //recalculate the prefetch: + outstanding.reset(); + for_each( + unacked.begin(), unacked.end(), + boost::bind(&DeliveryRecord::addTo, _1, &outstanding)); + } + else { + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); + } + } //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + + // TODO aconway 2006-12-13: Does this need to be atomic? + // If so we need a redesign, requestDispatch re-enters + // Channel::dispatch. + // + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ j->second->requestDispatch(); } } } -void Channel::recover(bool requeue){ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstanding.reset(); - std::list copy = unacked; - unacked.clear(); - for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); +void Channel::recover(bool requeue) { + std::list copyUnacked; + boost::function1 recoverFn; + { + Mutex::ScopedLock l(lock); + if(requeue) { + outstanding.reset(); + copyUnacked.swap(unacked); + recoverFn = boost::bind(&DeliveryRecord::requeue, _1); + } + else { + copyUnacked = unacked; + recoverFn = boost::bind(&DeliveryRecord::redeliver, _1, this); + } } + // TODO aconway 2006-12-13: Does recovery of copyUnacked have to + // be atomic with extracting the list? + for_each(copyUnacked.begin(), copyUnacked.end(), recoverFn); } bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Mutex::ScopedLock l(lock); + // TODO aconway 2006-12-13: Nasty to have all these external calls + // inside a critical.section but none appear to have blocking potential. + // sendGetOk does non-blocking IO + // Message::shared_ptr msg = queue->dequeue(); - if(msg){ - Mutex::ScopedLock locker(deliveryLock); + if(msg) { u_int64_t myDeliveryTag = currentDeliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + u_int32_t count = queue->getMessageCount(); + msg->sendGetOk(out, id, count + 1, myDeliveryTag, framesize); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } return true; - }else{ - return false; } + return false; } -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, + u_int64_t deliveryTag){ msg->deliver(out, id, consumerTag, deliveryTag, framesize); } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index fa3912c78e..776862dd34 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -79,10 +79,10 @@ namespace qpid { u_int32_t prefetchSize; u_int16_t prefetchCount; Prefetch outstanding; - u_int32_t framesize; + const u_int32_t framesize; NameGenerator tagGenerator; std::list unacked; - qpid::sys::Mutex deliveryLock; + qpid::sys::Mutex lock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; MessageStore* const store; diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 598de2d590..148dbefa78 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -143,7 +143,8 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) { u_int64_t expected = expectedContentSize(); if (expected != buffer.available()) { - std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; + std::cout << "WARN: Expected " << expectedContentSize() + << " bytes, got " << buffer.available() << std::endl; throw Exception("Cannot decode content, buffer not large enough."); } diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 0e48d3b13d..deb7c38824 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -83,7 +83,8 @@ bool Queue::dispatch(Message::shared_ptr& msg){ return false; }else if(exclusive){ if(!exclusive->deliver(msg)){ - std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; + std::cout << "WARNING: Dropping undeliverable message " + << "from queue with exclusive consumer." << std::endl; } return true; }else{ diff --git a/cpp/lib/broker/TopicExchange.cpp b/cpp/lib/broker/TopicExchange.cpp index 3ebb3c8c56..73a633859b 100644 --- a/cpp/lib/broker/TopicExchange.cpp +++ b/cpp/lib/broker/TopicExchange.cpp @@ -81,10 +81,7 @@ void TopicPattern::normalize() { namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. -// Need StringRef class that operates on a string in place witout copy. -// Should be applied everywhere strings are extracted from frames. -// + bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) { // Invariant: [pattern_begin..p) matches [target_begin..t) diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp index ac8b4a9ced..244831effe 100644 --- a/cpp/lib/client/ResponseHandler.cpp +++ b/cpp/lib/client/ResponseHandler.cpp @@ -28,32 +28,33 @@ qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} qpid::client::ResponseHandler::~ResponseHandler(){} -bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ +bool qpid::client::ResponseHandler::validate( + const qpid::framing::AMQMethodBody& expected) +{ return expected.match(response.get()); } void qpid::client::ResponseHandler::waitForResponse(){ Monitor::ScopedLock l(monitor); - if(waiting){ + while(waiting) monitor.wait(); - } } -void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ - response = _response; +void qpid::client::ResponseHandler::signalResponse( + qpid::framing::AMQMethodBody::shared_ptr _response) +{ Monitor::ScopedLock l(monitor); + response = _response; waiting = false; monitor.notify(); } void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ Monitor::ScopedLock l(monitor); - if(waiting){ + while(waiting) monitor.wait(); - } - if(!validate(expected)){ + if(!validate(expected)) THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); - } } void qpid::client::ResponseHandler::expect(){ diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp index 0161518011..ef88c5cb74 100644 --- a/cpp/lib/common/Exception.cpp +++ b/cpp/lib/common/Exception.cpp @@ -20,6 +20,7 @@ */ #include +#include namespace qpid { @@ -29,14 +30,32 @@ Exception::Exception(const std::string& str) throw() : whatStr(str) {} Exception::Exception(const char* str) throw() : whatStr(str) {} +Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {} + Exception::~Exception() throw() {} const char* Exception::what() const throw() { return whatStr.c_str(); } std::string Exception::toString() const throw() { return whatStr; } -Exception* Exception::clone() const throw() { return new Exception(*this); } +Exception::auto_ptr Exception::clone() const throw() { + return Exception::auto_ptr(new Exception(*this)); +} void Exception::throwSelf() const { throw *this; } +const char* Exception::defaultMessage = "Unexpected exception"; + +void Exception::log(const char* what, const char* message) { + std::cout << message << ": " << what << std::endl; +} + +void Exception::log(const std::exception& e, const char* message) { + log(e.what(), message); +} + +void Exception::logUnknown(const char* message) { + log("unknown exception.", message); +} + } // namespace qpid diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h index f35d427bb0..185c395283 100644 --- a/cpp/lib/common/Exception.h +++ b/cpp/lib/common/Exception.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace qpid { @@ -38,6 +39,10 @@ class Exception : public std::exception std::string whatStr; public: + typedef boost::shared_ptr shared_ptr; + typedef boost::shared_ptr shared_ptr_const; + typedef std::auto_ptr auto_ptr; + Exception() throw(); Exception(const std::string& str) throw(); Exception(const char* str) throw(); @@ -48,14 +53,65 @@ class Exception : public std::exception virtual const char* what() const throw(); virtual std::string toString() const throw(); - virtual Exception* clone() const throw(); + virtual std::auto_ptr clone() const throw(); virtual void throwSelf() const; - typedef boost::shared_ptr shared_ptr; -}; + /** Default message: "Unknown exception" or something like it. */ + static const char* defaultMessage; + /** + * Log a message of the form "message: what" + *@param what Exception's what() message. + *@param message Prefix message. + */ + static void log(const char* what, const char* message = defaultMessage); -} + /** + * Log an exception. + *@param e Exception to log. + + */ + static void log( + const std::exception& e, const char* message = defaultMessage); + + + /** + * Log an unknown exception - use in catch(...) + *@param message Prefix message. + */ + static void logUnknown(const char* message = defaultMessage); + + /** + * Wrapper template function to call another function inside + * try/catch and log any exception. Use boost::bind to wrap + * member function calls or functions with arguments. + * + *@param f Function to call in try block. + *@param retrhow If true the exception is rethrown. + *@param message Prefix message. + */ + template + static T tryCatchLog(boost::function0 f, bool rethrow=true, + const char* message=defaultMessage) + { + try { + return f(); + } + catch (const std::exception& e) { + log(e, message); + if (rethrow) + throw; + } + catch (...) { + logUnknown(message); + if (rethrow) + throw; + } + } + +}; + +} // namespace qpid #endif /*!_Exception_*/ diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp index de8d7b2487..e69de29bb2 100644 --- a/cpp/lib/common/ExceptionHolder.cpp +++ b/cpp/lib/common/ExceptionHolder.cpp @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ExceptionHolder.h" - -namespace qpid { - -ExceptionHolder::ExceptionHolder(const std::exception& e) { - const Exception* ex = dynamic_cast(&e); - if (ex) { - reset(ex->clone()); - } else { - reset(new Exception(e.what())); - } -} - -} diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h index 83d0884be9..e69de29bb2 100644 --- a/cpp/lib/common/ExceptionHolder.h +++ b/cpp/lib/common/ExceptionHolder.h @@ -1,62 +0,0 @@ -#ifndef _qpid_ExceptionHolder_h -#define _qpid_ExceptionHolder_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -namespace qpid { - -/** - * Holder for a heap-allocated exc eption that can be stack allocated - * and thrown safely. - * - * Basically this is a shared_ptr with the Exception functions added - * so the catcher need not be aware that it is a pointer rather than a - * reference. - * - * shared_ptr is chosen over auto_ptr because it has normal - * copy semantics. - */ -class ExceptionHolder : public Exception, public boost::shared_ptr -{ - public: - typedef boost::shared_ptr shared_ptr; - - ExceptionHolder() throw() {} - ExceptionHolder(Exception* p) throw() : shared_ptr(p) {} - ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {} - - ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {} - ExceptionHolder(const std::exception& e); - - ~ExceptionHolder() throw() {} - - const char* what() const throw() { return (*this)->what(); } - std::string toString() const throw() { return (*this)->toString(); } - virtual Exception* clone() const throw() { return (*this)->clone(); } - virtual void throwSelf() const { (*this)->throwSelf(); } -}; - -} // namespace qpid - - - -#endif /*!_qpid_ExceptionHolder_h*/ diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index e1f7503282..997558f3da 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -24,21 +24,23 @@ apr_hdr = \ $(apr)/LFSessionContext.h posix = sys/posix -posix_src = \ - $(posix)/PosixAcceptor.cpp \ - $(posix)/Socket.cpp \ - $(posix)/Thread.cpp \ - $(posix)/check.cpp \ - $(posix)/EventChannel.cpp \ - $(posix)/EventChannelThreads.cpp -posix_hdr = \ - $(posix)/check.h \ - $(posix)/EventChannel.h \ - $(posix)/EventChannelThreads.h +posix_src = \ + $(posix)/EventChannelAcceptor.cpp \ + $(posix)/Socket.cpp \ + $(posix)/Thread.cpp \ + $(posix)/check.cpp \ + $(posix)/EventChannel.cpp \ + $(posix)/EventChannelThreads.cpp \ + $(posix)/EventChannelConnection.cpp +posix_hdr = \ + $(posix)/check.h \ + $(posix)/EventChannel.h \ + $(posix)/EventChannelThreads.h \ + $(posix)/EventChannelConnection.h -EXTRA_DIST=$(posix_src) $(posix_hdr) -platform_src = $(apr_src) -platform_hdr = $(apr_hdr) +EXTRA_DIST=$(apr_src) $(apr_hdr) +platform_src = $(posix_src) +platform_hdr = $(posix_hdr) framing = framing gen = $(srcdir)/../../gen @@ -76,7 +78,6 @@ libqpidcommon_la_SOURCES = \ $(gen)/AMQP_MethodVersionMap.cpp \ $(gen)/AMQP_ServerProxy.cpp \ Exception.cpp \ - ExceptionHolder.cpp \ QpidError.cpp \ sys/Runnable.cpp \ sys/Time.cpp @@ -107,7 +108,6 @@ nobase_pkginclude_HEADERS = \ $(framing)/amqp_types.h \ $(framing)/AMQP_HighestVersion.h \ Exception.h \ - ExceptionHolder.h \ QpidError.h \ SharedObject.h \ sys/Acceptor.h \ diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp index 7f4f9e2f34..9cbd66c841 100644 --- a/cpp/lib/common/QpidError.cpp +++ b/cpp/lib/common/QpidError.cpp @@ -22,23 +22,41 @@ #include #include -using namespace qpid; +namespace qpid { QpidError::QpidError() : code(0) {} -QpidError::QpidError(int _code, const std::string& _msg, - const SrcLine& _loc) throw() +QpidError::QpidError( + int _code, const std::string& _msg, Location _loc) throw() : code(_code), msg(_msg), location(_loc) { - std::ostringstream os; - os << "Error [" << code << "] " << msg << " (" - << location.file << ":" << location.line << ")"; - whatStr = os.str(); + setWhat(); +} + +QpidError::QpidError( + int _code, const char* _msg, Location _loc) throw() + : code(_code), msg(_msg), location(_loc) +{ + setWhat(); } QpidError::~QpidError() throw() {} -Exception* QpidError::clone() const throw() { return new QpidError(*this); } +Exception::auto_ptr QpidError::clone() const throw() { + return Exception::auto_ptr(new QpidError(*this)); +} void QpidError::throwSelf() const { throw *this; } +void QpidError::setWhat() { + std::ostringstream os; + os << "Error [" << code << "] " << msg; + if (location.file) { + os << " (" ; + os << location.file << ":" << location.line; + os << ")"; + } + whatStr = os.str(); +} + +} // namespace qpid diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h index 30d9d27076..9a47aa5e00 100644 --- a/cpp/lib/common/QpidError.h +++ b/cpp/lib/common/QpidError.h @@ -24,37 +24,45 @@ #include #include #include +#include namespace qpid { -struct SrcLine { - public: - SrcLine(const std::string& file_="", int line_=0) : - file(file_), line(line_) {} - - std::string file; - int line; -}; - class QpidError : public Exception { public: + // Use macro QPID_LOCATION to construct a location. + struct Location { + Location(const char* function_=0, const char* file_=0, int line_=0) : + function(function_), file(file_), line(line_) {} + const char* function; + const char* file; + int line; + }; + const int code; const std::string msg; - const SrcLine location; + const Location location; QpidError(); - QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw(); + QpidError(int _code, const char* _msg, const Location _loc) throw(); + QpidError(int _code, const std::string& _msg, const Location _loc) throw(); + ~QpidError() throw(); - Exception* clone() const throw(); + Exception::auto_ptr clone() const throw(); void throwSelf() const; + + private: + void setWhat(); }; } // namespace qpid -#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) +#define QPID_ERROR_LOCATION \ + ::qpid::QpidError::Location(BOOST_CURRENT_FUNCTION, __FILE__, __LINE__) -#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) +#define QPID_ERROR(CODE, MESSAGE) \ + ::qpid::QpidError((CODE), (MESSAGE), QPID_ERROR_LOCATION) #define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h index bec1946fb7..1ff3ff191f 100644 --- a/cpp/lib/common/framing/AMQFrame.h +++ b/cpp/lib/common/framing/AMQFrame.h @@ -40,9 +40,9 @@ namespace qpid { static AMQP_MethodVersionMap versionMap; u_int16_t channel; - u_int8_t type;//used if the body is decoded separately from the 'head' + u_int8_t type;//used if body decoded separately from 'head' AMQBody::shared_ptr body; - AMQBody::shared_ptr createMethodBody(Buffer& buffer); + AMQBody::shared_ptr createMethodBody(Buffer& buffer); public: AMQFrame(); diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h index e6bc27a593..7aed068dd0 100644 --- a/cpp/lib/common/sys/Acceptor.h +++ b/cpp/lib/common/sys/Acceptor.h @@ -33,10 +33,11 @@ class SessionHandlerFactory; class Acceptor : public qpid::SharedObject { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false); - virtual ~Acceptor() = 0; - virtual int16_t getPort() const = 0; - virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0; + static Acceptor::shared_ptr create( + int16_t port, int backlog, int threads, bool trace = false); + virtual ~Acceptor(); + virtual int getPort() const = 0; + virtual void run(SessionHandlerFactory& factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h index b625b2c9b0..7a9555480f 100644 --- a/cpp/lib/common/sys/AtomicCount.h +++ b/cpp/lib/common/sys/AtomicCount.h @@ -21,36 +21,52 @@ #include #include +#include namespace qpid { namespace sys { /** - * Atomic counter. + * Increment counter in constructor and decrement in destructor. + * Optionally call a function if the decremented counter value is 0. + * Note the function must not throw, it is called in the destructor. */ -class AtomicCount : boost::noncopyable { +template +class ScopedIncrement : boost::noncopyable { public: - class ScopedDecrement : boost::noncopyable { - public: - /** Decrement counter in constructor and increment in destructor. */ - ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } - ~ScopedDecrement() { ++count; } - /** Return the value returned by the decrement. */ - operator long() { return value; } - private: - AtomicCount& count; - long value; - }; + ScopedIncrement(Count& c, boost::function0 f=0) + : count(c), callback(f) { ++count; } + ~ScopedIncrement() { if (--count == 0 && callback) callback(); } - class ScopedIncrement : boost::noncopyable { - public: - /** Increment counter in constructor and increment in destructor. */ - ScopedIncrement(AtomicCount& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } - private: - AtomicCount& count; - }; + private: + Count& count; + boost::function0 callback; +}; +/** Decrement counter in constructor and increment in destructor. */ +template +class ScopedDecrement : boost::noncopyable { + public: + ScopedDecrement(Count& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + + /** Return the value after the decrement. */ + operator long() { return value; } + + private: + Count& count; + long value; +}; + + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { + public: + typedef ScopedIncrement ScopedIncrement; + typedef ScopedDecrement ScopedDecrement; + AtomicCount(long value = 0) : count(value) {} void operator++() { ++count ; } diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp index 30122c682f..5d4f48a373 100644 --- a/cpp/lib/common/sys/Runnable.cpp +++ b/cpp/lib/common/sys/Runnable.cpp @@ -29,4 +29,8 @@ Runnable::Functor Runnable::functor() return boost::bind(&Runnable::run, this); } +void FunctorRunnable::run() { + f(); +} + }} diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h index fb3927c612..ef18897b09 100644 --- a/cpp/lib/common/sys/Runnable.h +++ b/cpp/lib/common/sys/Runnable.h @@ -44,7 +44,16 @@ class Runnable Functor functor(); }; -}} +/** Runnable wrapper for a functor */ +class FunctorRunnable : public Runnable { + public: + explicit FunctorRunnable(const Runnable::Functor& runMe) : f(runMe) {} + void run(); + private: + Runnable::Functor f; +}; + +}} // namespace qpid::sys #endif diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h index d793a240c6..e35ed5b07c 100644 --- a/cpp/lib/common/sys/Socket.h +++ b/cpp/lib/common/sys/Socket.h @@ -70,8 +70,11 @@ class Socket */ int listen(int port = 0, int backlog = 10); + /** Accept a connection. This socket must be listening */ + Socket accept(); + /** Get file descriptor */ - int fd(); + int fd() const; private: #ifdef USE_APR diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h index 47b95b6234..9647dc2414 100644 --- a/cpp/lib/common/sys/Thread.h +++ b/cpp/lib/common/sys/Thread.h @@ -116,7 +116,8 @@ Thread::Thread(Runnable& runnable) { } void Thread::join(){ - QPID_POSIX_THROW_IF(pthread_join(thread, 0)); + if (thread != 0) + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); } long Thread::id() { diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h index 3dd46741d8..4c6951b429 100644 --- a/cpp/lib/common/sys/Time.h +++ b/cpp/lib/common/sys/Time.h @@ -22,6 +22,7 @@ * */ +#include #include #ifdef USE_APR @@ -33,7 +34,7 @@ namespace qpid { namespace sys { -/** Time in nanoseconds */ +/** Time in nanoseconds. */ typedef int64_t Time; Time now(); @@ -47,6 +48,9 @@ const Time TIME_USEC = 1000; /** Nanoseconds per nanosecond. */ const Time TIME_NSEC = 1; +/** Value to represent an infinite timeout */ +const Time TIME_INFINITE = std::numeric_limits