diff options
author | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-29 14:36:08 +0000 |
commit | b13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch) | |
tree | ef0362e52c125bc75b07ef3e374dabfa52254e98 | |
parent | 16d818e749462daf5e0e43079b2e48991646c619 (diff) | |
download | qpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz |
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel.
Misc cleanup/enhancements.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
43 files changed, 1903 insertions, 661 deletions
diff --git a/cpp/Makefile b/cpp/Makefile index dd7551a648..8f7b6795b2 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -72,6 +72,7 @@ $(BUILDDIRS): ## Library rules +## DONT COMMIT LIB_common := $(call LIBFILE,common,1.0) DIRS_common := qpid qpid/framing qpid/sys qpid/$(PLATFORM) $(LIB_common): $(call OBJECTS, $(DIRS_common)) @@ -79,13 +80,13 @@ $(LIB_common): $(call OBJECTS, $(DIRS_common)) LIB_client := $(call LIBFILE,client,1.0) DIRS_client := qpid/client -$(LIB_client): $(call OBJECTS,$(DIRS_client)) $(LIB_common) - $(LIB_COMMAND) +$(LIB_client): $(call OBJECTS,$(DIRS_client)) + $(LIB_COMMAND) $(LIB_common) LIB_broker := $(call LIBFILE,broker,1.0) DIRS_broker := qpid/broker -$(LIB_broker): $(call OBJECTS,$(DIRS_broker)) $(LIB_common) - $(LIB_COMMAND) +$(LIB_broker): $(call OBJECTS,$(DIRS_broker)) + $(LIB_COMMAND) $(LIB_common) ## Daemon executable $(BINDIR)/qpidd: $(OBJDIR)/qpidd.o $(LIB_common) $(LIB_broker) @@ -94,12 +95,20 @@ $(BINDIR)/qpidd: $(OBJDIR)/qpidd.o $(LIB_common) $(LIB_broker) all-nogen: $(BINDIR)/qpidd ## Unit tests. -UNITTEST_SRC:=$(shell find test/unit -name *Test.cpp) -UNITTEST_SRC:=$(filter-out test/unit/qpid/$(IGNORE)/%,$(UNITTEST_SRC)) -UNITTESTS:=$(UNITTEST_SRC:test/unit/%.cpp=$(TESTDIR)/%.so) +define UNITTEST_GROUP +UNITTEST_SRC_$1 := $(wildcard $(DIRS_$1:%=test/unit/%/*Test.cpp)) +UNITTEST_SO_$1 := $$(UNITTEST_SRC_$1:test/unit/%.cpp=$(TESTDIR)/%.so) +$$(UNITTEST_SO_$1): $(LIB_$1) +UNITTESTS := $$(UNITTESTS) $$(UNITTEST_SO_$1) +endef -unittest: all +$(eval $(call UNITTEST_GROUP,common)) +$(eval $(call UNITTEST_GROUP,broker)) +$(eval $(call UNITTEST_GROUP,client)) + +unittest: $(UNITTESTS) DllPlugInTester -c -b $(UNITTESTS:.cpp=.so) + all-nogen: $(UNITTESTS) ## Run python tests @@ -143,7 +152,7 @@ all-nogen: $(CLIENT_TEST_EXE) client: $(CLIENT_TEST_EXE) ## include dependencies -DEPFILES:=$(wildcard $(OBJDIR)/*.d $(OBJDIR)/*/*.d $(OBJDIR)/*/*/*.d) +DEPFILES:=$(shell find $(OBJDIR) $(TESTDIR) -name "*.d") ifdef DEPFILES -include $(DEPFILES) endif diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index 1263ba8472..44aad4cb4f 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -21,4 +21,22 @@ #include <qpid/Exception.h> -qpid::Exception::~Exception() throw() {} +namespace qpid { + +Exception::Exception() throw() {} + +Exception::Exception(const std::string& str) throw() : whatStr(str) {} + +Exception::Exception(const char* str) throw() : whatStr(str) {} + +Exception::~Exception() throw() {} + +const char* Exception::what() const throw() { return whatStr.c_str(); } + +std::string Exception::toString() const throw() { return whatStr; } + +Exception* Exception::clone() const throw() { return new Exception(*this); } + +void Exception::throwSelf() const { throw *this; } + +} // namespace qpid diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 2aba43586d..f35d427bb0 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -25,6 +25,7 @@ #include <exception> #include <string> #include <memory> +#include <boost/shared_ptr.hpp> namespace qpid { @@ -37,42 +38,24 @@ class Exception : public std::exception std::string whatStr; public: - Exception() throw() {} - Exception(const std::string& str) throw() : whatStr(str) {} - Exception(const char* str) throw() : whatStr(str) {} + Exception() throw(); + Exception(const std::string& str) throw(); + Exception(const char* str) throw(); + Exception(const std::exception&) throw(); + virtual ~Exception() throw(); - virtual const char* what() const throw() { return whatStr.c_str(); } - virtual std::string toString() const throw() { return whatStr; } + virtual const char* what() const throw(); + virtual std::string toString() const throw(); + + virtual Exception* clone() const throw(); + virtual void throwSelf() const; + + typedef boost::shared_ptr<Exception> shared_ptr; }; -/** - * Wrapper for heap-allocated exceptions. Use like this: - * <code> - * std::auto_ptr<Exception> ex = new SomeEx(...) - * HeapException hex(ex); // Takes ownership - * throw hex; // Auto-deletes ex - * </code> - */ -class HeapException : public Exception, public std::auto_ptr<Exception> -{ - public: - HeapException() {} - HeapException(std::auto_ptr<Exception> e) : std::auto_ptr<Exception>(e) {} - HeapException& operator=(std::auto_ptr<Exception>& e) { - std::auto_ptr<Exception>::operator=(e); - return *this; - } - ~HeapException() throw() {} - - virtual const char* what() const throw() { return (*this)->what(); } - virtual std::string toString() const throw() { - return (*this)->toString(); - } -}; - } #endif /*!_Exception_*/ diff --git a/cpp/src/qpid/ExceptionHolder.cpp b/cpp/src/qpid/ExceptionHolder.cpp new file mode 100644 index 0000000000..de8d7b2487 --- /dev/null +++ b/cpp/src/qpid/ExceptionHolder.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "ExceptionHolder.h" + +namespace qpid { + +ExceptionHolder::ExceptionHolder(const std::exception& e) { + const Exception* ex = dynamic_cast<const Exception*>(&e); + if (ex) { + reset(ex->clone()); + } else { + reset(new Exception(e.what())); + } +} + +} diff --git a/cpp/src/qpid/ExceptionHolder.h b/cpp/src/qpid/ExceptionHolder.h new file mode 100644 index 0000000000..c2deca803e --- /dev/null +++ b/cpp/src/qpid/ExceptionHolder.h @@ -0,0 +1,60 @@ +#ifndef _qpid_ExceptionHolder_h +#define _qpid_ExceptionHolder_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/Exception.h> +#include <boost/shared_ptr.hpp> + +namespace qpid { + +/** + * Holder for a heap-allocated exc eption that can be stack allocated + * and thrown safely. + * + * Basically this is a shared_ptr with the Exception functions added + * so the catcher need not be aware that it is a pointer rather than a + * reference. + * + * shared_ptr is chosen over auto_ptr because it has normal + * copy semantics. + */ +class ExceptionHolder : public Exception, public boost::shared_ptr<Exception> +{ + public: + typedef boost::shared_ptr<Exception> shared_ptr; + + ExceptionHolder() throw() {} + ExceptionHolder(Exception* p) throw() : shared_ptr(p) {} + ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {} + + ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {} + ExceptionHolder(const std::exception& e); + + const char* what() const throw() { return (*this)->what(); } + std::string toString() const throw() { return (*this)->toString(); } + virtual Exception* clone() const throw() { return (*this)->clone(); } + virtual void throwSelf() const { (*this)->throwSelf(); } +}; + +} // namespace qpid + + + +#endif /*!_qpid_ExceptionHolder_h*/ diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp index 0b5135269b..70fff6550a 100644 --- a/cpp/src/qpid/QpidError.cpp +++ b/cpp/src/qpid/QpidError.cpp @@ -24,12 +24,21 @@ using namespace qpid; -QpidError::QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw() - : code(_code), msg(_msg), file(_file), line(_line) +QpidError::QpidError() : code(0) {} + +QpidError::QpidError(int _code, const std::string& _msg, + const SrcLine& _loc) throw() + : code(_code), msg(_msg), location(_loc) { std::ostringstream os; - os << "QpidError(" << code << ") " << msg << " (" << file << ":" << line << ")"; + os << "Error [" << code << "] " << msg << " (" + << location.file << ":" << location.line << ")"; whatStr = os.str(); } QpidError::~QpidError() throw() {} + +Exception* QpidError::clone() const throw() { return new QpidError(*this); } + +void QpidError::throwSelf() const { throw *this; } + diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h index 79ccd2f579..5d7aa93674 100644 --- a/cpp/src/qpid/QpidError.h +++ b/cpp/src/qpid/QpidError.h @@ -21,29 +21,47 @@ * */ #include <string> +#include <memory> +#include <ostream> #include <qpid/Exception.h> namespace qpid { - class QpidError : public Exception { - public: - const int code; - const std::string msg; - const std::string file; - const int line; +struct SrcLine { + public: + SrcLine(const std::string& file_="", int line_=0) : + file(file_), line(line_) {} - QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw(); - ~QpidError() throw(); - }; + std::string file; + int line; +}; + +class QpidError : public Exception { + public: + const int code; + const std::string msg; + const SrcLine location; -#define THROW_QPID_ERROR(A, B) throw QpidError(A, B, __FILE__, __LINE__) + QpidError(); + QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw(); + ~QpidError() throw(); + Exception* clone() const throw(); + void throwSelf() const; +}; -} -#define PROTOCOL_ERROR 10000 -#define APR_ERROR 20000 -#define FRAMING_ERROR 30000 -#define CLIENT_ERROR 40000 -#define INTERNAL_ERROR 50000 +} // namespace qpid + +#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) + +#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) + +#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) + +const int PROTOCOL_ERROR = 10000; +const int APR_ERROR = 20000; +const int FRAMING_ERROR = 30000; +const int CLIENT_ERROR = 40000; +const int INTERNAL_ERROR = 50000; #endif diff --git a/cpp/src/qpid/apr/APRBase.cpp b/cpp/src/qpid/apr/APRBase.cpp index c3dbda3df0..7a35f098ec 100644 --- a/cpp/src/qpid/apr/APRBase.cpp +++ b/cpp/src/qpid/apr/APRBase.cpp @@ -87,7 +87,8 @@ void qpid::sys::check(apr_status_t status, const std::string& file, const int li const int size = 50; char tmp[size]; std::string msg(apr_strerror(status, tmp, size)); - throw QpidError(APR_ERROR + ((int) status), msg, file, line); + throw QpidError(APR_ERROR + ((int) status), msg, + qpid::SrcLine(file, line)); } } diff --git a/cpp/src/qpid/apr/LFProcessor.cpp b/cpp/src/qpid/apr/LFProcessor.cpp index c8a583a34c..0187beab10 100644 --- a/cpp/src/qpid/apr/LFProcessor.cpp +++ b/cpp/src/qpid/apr/LFProcessor.cpp @@ -134,8 +134,8 @@ void LFProcessor::run(){ session->stopProcessing(); } } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + }catch(std::exception e){ + std::cout << e.what() << std::endl; } } diff --git a/cpp/src/qpid/apr/Socket.cpp b/cpp/src/qpid/apr/Socket.cpp index 9ef6baeb88..3cf510872f 100644 --- a/cpp/src/qpid/apr/Socket.cpp +++ b/cpp/src/qpid/apr/Socket.cpp @@ -27,21 +27,24 @@ using namespace qpid::sys; -Socket::Socket() -{ +Socket Socket::createTcp() { + Socket s; CHECK_APR_SUCCESS( apr_socket_create( - &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, + &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + return s; } -void Socket::setTimeout(long msecs) -{ - apr_socket_timeout_set(socket, msecs*1000); +Socket::Socket(apr_socket_t* s) { + socket = s; } -void Socket::connect(const std::string& host, int port) -{ +void Socket::setTimeout(Time interval) { + apr_socket_timeout_set(socket, interval/TIME_USEC); +} + +void Socket::connect(const std::string& host, int port) { apr_sockaddr_t* address; CHECK_APR_SUCCESS( apr_sockaddr_info_get( @@ -50,27 +53,28 @@ void Socket::connect(const std::string& host, int port) CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); } -void Socket::close() -{ +void Socket::close() { if (socket == 0) return; CHECK_APR_SUCCESS(apr_socket_close(socket)); socket = 0; } -ssize_t Socket::send(const char* data, size_t size) +ssize_t Socket::send(const void* data, size_t size) { apr_size_t sent = size; - apr_status_t status = apr_socket_send(socket, data, &sent); + apr_status_t status = + apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent); if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF; CHECK_APR_SUCCESS(status); return sent; } -ssize_t Socket::recv(char* data, size_t size) +ssize_t Socket::recv(void* data, size_t size) { apr_size_t received = size; - apr_status_t status = apr_socket_recv(socket, data, &received); + apr_status_t status = + apr_socket_recv(socket, reinterpret_cast<char*>(data), &received); if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; CHECK_APR_SUCCESS(status); return received; diff --git a/cpp/src/qpid/broker/AutoDelete.cpp b/cpp/src/qpid/broker/AutoDelete.cpp index 6c7ad432db..45232f154c 100644 --- a/cpp/src/qpid/broker/AutoDelete.cpp +++ b/cpp/src/qpid/broker/AutoDelete.cpp @@ -63,7 +63,7 @@ void AutoDelete::run(){ Monitor::ScopedLock l(monitor); while(!stopped){ process(); - monitor.wait(period * Time::NSEC_PER_MSEC); + monitor.wait(period*TIME_MSEC); } } diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h index b706246900..a49014314d 100644 --- a/cpp/src/qpid/broker/AutoDelete.h +++ b/cpp/src/qpid/broker/AutoDelete.h @@ -1,3 +1,5 @@ +#ifndef _AutoDelete_ +#define _AutoDelete_ /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,9 +20,6 @@ * under the License. * */ -#ifndef _AutoDelete_ -#define _AutoDelete_ - #include <iostream> #include <queue> #include <qpid/sys/Monitor.h> diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 39f5c23ee6..1fce1acf4c 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -52,7 +52,7 @@ void Configuration::parse(int argc, char** argv){ matched = (*i)->parse(position, argv, argc); } if(!matched){ - std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl; + std::cout<< "Warning: skipping unrecognised option " << argv[position] << std::endl; position++; } } diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 0f3223d3a6..932038c4da 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -43,7 +43,8 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i != bindings.end()) { bindings.erase(i); - // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match? + // TODO aconway 2006-09-14: What about the ExchangeBinding object? + // Don't we have to verify routingKey/args match? } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 1150d055cb..00b0a844ab 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -42,7 +42,7 @@ Queue::Queue(const string& _name, u_int32_t _autodelete, exclusive(0), persistenceId(0) { - if(autodelete) lastUsed = Time::now().msecs(); + if(autodelete) lastUsed = now()/TIME_MSEC; } Queue::~Queue(){ @@ -137,7 +137,7 @@ void Queue::consume(Consumer* c, bool requestExclusive){ void Queue::cancel(Consumer* c){ Mutex::ScopedLock locker(lock); consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = Time::now().msecs(); + if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; if(exclusive == c) exclusive = 0; } @@ -170,7 +170,7 @@ u_int32_t Queue::getConsumerCount() const{ bool Queue::canAutoDelete() const{ Mutex::ScopedLock locker(lock); - return lastUsed && (Time::now().msecs() - lastUsed > autodelete); + return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete); } void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 34fb25781e..938548d091 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -82,7 +82,8 @@ void TopicPattern::normalize() { namespace { // TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. -// Need more efficient Tokens impl that can operate on a string in place. +// Need StringRef class that operates on a string in place witout copy. +// Should be applied everywhere strings are extracted from frames. // bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) { diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index de324fdab4..0b520d169d 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -216,7 +216,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){ } void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ - std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl; + std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; string msg = e.msg; if(method == 0){ diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 86fbdc062c..116ea74193 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -44,6 +44,7 @@ Connector::Connector(bool _debug, u_int32_t buffer_size) : Connector::~Connector(){ } void Connector::connect(const std::string& host, int port){ + socket = Socket::createTcp(); socket.connect(host, port); closed = false; receiver = Thread(this); @@ -92,7 +93,7 @@ void Connector::writeToSocket(char* data, size_t available){ while(written < available && !closed){ ssize_t sent = socket.send(data + written, available-written); if(sent > 0) { - lastOut = Time::now().msecs(); + lastOut = now() * TIME_MSEC; written += sent; } } @@ -106,17 +107,17 @@ void Connector::handleClosed(){ void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ - int64_t now = Time::now().msecs(); + Time t = now() * TIME_MSEC; if(status == Socket::SOCKET_TIMEOUT) { - if(idleIn && (now - lastIn > idleIn)){ + if(idleIn && (t - lastIn > idleIn)){ timeoutHandler->idleIn(); } }else if(status == Socket::SOCKET_EOF){ handleClosed(); }else{ - lastIn = now; + lastIn = t; } - if(idleOut && (now - lastOut > idleOut)){ + if(idleOut && (t - lastOut > idleOut)){ timeoutHandler->idleOut(); } } @@ -140,7 +141,7 @@ void Connector::setWriteTimeout(u_int16_t t){ } void Connector::setSocketTimeout(){ - socket.setTimeout(timeout); + socket.setTimeout(timeout*TIME_MSEC); } void Connector::setTimeoutHandler(TimeoutHandler* handler){ @@ -171,7 +172,9 @@ void Connector::run(){ } } }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << "Error [" << error.code << "] " << error.msg + << " (" << error.location.file << ":" << error.location.line + << ")" << std::endl; handleClosed(); } } diff --git a/cpp/src/qpid/posix/EpollEventChannel.cpp b/cpp/src/qpid/posix/EpollEventChannel.cpp deleted file mode 100644 index 7dce4bc58c..0000000000 --- a/cpp/src/qpid/posix/EpollEventChannel.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <qpid/sys/EventChannel.h> -#include <sys/epoll.h> -#include "EpollEventChannel.h" - -namespace qpid { -namespace sys { - -EventChannel::shared_ptr EventChannel::create() -{ - return EventChannel::shared_ptr(new EpollEventChannel()); -} - -EpollEventChannel::EpollEventChannel() -{ - // TODO aconway 2006-11-13: How to choose size parameter? - static const size_t estimatedFdsForEpoll = 1000; - epollFd = epoll_create(estimatedFdsForEpoll); -} - -EpollEventChannel::~EpollEventChannel() { } - -void -EpollEventChannel::post(ReadEvent& /*event*/) -{ -} - -void -EpollEventChannel::post(WriteEvent& /*event*/) -{ -} - -void -EpollEventChannel::post(AcceptEvent& /*event*/) -{ -} - -void -EpollEventChannel::post(NotifyEvent& /*event*/) -{ -} - -inline void -EpollEventChannel::post(Event& /*event*/) -{ -} - -Event* -EpollEventChannel::getEvent() -{ - return 0; -} - -void -EpollEventChannel::dispose(void* /*buffer*/, size_t) -{ -} - -}} diff --git a/cpp/src/qpid/posix/EventChannel.cpp b/cpp/src/qpid/posix/EventChannel.cpp new file mode 100644 index 0000000000..9d0819f206 --- /dev/null +++ b/cpp/src/qpid/posix/EventChannel.cpp @@ -0,0 +1,325 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <mqueue.h> +#include <string.h> +#include <iostream> + +#include <sys/errno.h> +#include <sys/socket.h> +#include <sys/epoll.h> + +#include <typeinfo> +#include <iostream> +#include <queue> + +#include <boost/ptr_container/ptr_map.hpp> +#include <boost/current_function.hpp> + +#include <qpid/QpidError.h> +#include <qpid/sys/Monitor.h> + +#include "check.h" +#include "EventChannel.h" + +using namespace std; + + +// Convenience template to zero out a struct. +template <class S> struct ZeroStruct : public S { + ZeroStruct() { memset(this, 0, sizeof(*this)); } +}; + +namespace qpid { +namespace sys { + + +/** + * EventHandler wraps an epoll file descriptor. Acts as private + * interface between EventChannel and subclasses. + * + * Also implements Event interface for events that are not associated + * with a file descriptor and are passed via the message queue. + */ +class EventHandler : public Event, private Monitor +{ + public: + EventHandler(int epollSize = 256); + ~EventHandler(); + + int getEpollFd() { return epollFd; } + void epollAdd(int fd, uint32_t epollEvents, Event* event); + void epollMod(int fd, uint32_t epollEvents, Event* event); + void epollDel(int fd); + + void mqPut(Event* event); + Event* mqGet(); + + protected: + // Should never be called, only complete. + void prepare(EventHandler&) { assert(0); } + Event* complete(EventHandler& eh); + + private: + int epollFd; + std::string mqName; + int mqFd; + std::queue<Event*> mqEvents; +}; + +EventHandler::EventHandler(int epollSize) +{ + epollFd = epoll_create(epollSize); + if (epollFd < 0) throw QPID_POSIX_ERROR(errno); + + // Create a POSIX message queue for non-fd events. + // We write one byte and never read it is always ready for read + // when we add it to epoll. + // + ZeroStruct<struct mq_attr> attr; + attr.mq_maxmsg = 1; + attr.mq_msgsize = 1; + do { + char tmpnam[L_tmpnam]; + tmpnam_r(tmpnam); + mqName = tmpnam + 4; // Skip "tmp/" + mqFd = mq_open( + mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); + if (mqFd < 0) throw QPID_POSIX_ERROR(errno); + } while (mqFd == EEXIST); // Name already taken, try again. + + static char zero = '\0'; + mq_send(mqFd, &zero, 1, 0); + epollAdd(mqFd, 0, this); +} + +EventHandler::~EventHandler() { + mq_close(mqFd); + mq_unlink(mqName.c_str()); +} + +void EventHandler::mqPut(Event* event) { + ScopedLock l(*this); + assert(event != 0); + mqEvents.push(event); + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); +} + +Event* EventHandler::mqGet() { + ScopedLock l(*this); + if (mqEvents.empty()) + return 0; + Event* event = mqEvents.front(); + mqEvents.pop(); + if(!mqEvents.empty()) + epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); + return event; +} + +void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) +{ + ZeroStruct<struct epoll_event> ee; + ee.data.ptr = event; + ee.events = epollEvents; + if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) + throw QPID_POSIX_ERROR(errno); +} + +void EventHandler::epollDel(int fd) { + if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) + throw QPID_POSIX_ERROR(errno); +} + +Event* EventHandler::complete(EventHandler& eh) +{ + assert(&eh == this); + Event* event = mqGet(); + return event==0 ? 0 : event->complete(eh); +} + +// ================================================================ +// EventChannel + +EventChannel::shared_ptr EventChannel::create() { + return shared_ptr(new EventChannel()); +} + +EventChannel::EventChannel() : handler(new EventHandler()) {} + +EventChannel::~EventChannel() {} + +void EventChannel::postEvent(Event& e) +{ + e.prepare(*handler); +} + +Event* EventChannel::getEvent() +{ + static const int infiniteTimeout = -1; + ZeroStruct<struct epoll_event> epollEvent; + + // Loop until we can complete the event. Some events may re-post + // themselves and return 0 from complete, e.g. partial reads. // + Event* event = 0; + while (event == 0) { + int eventCount = epoll_wait(handler->getEpollFd(), + &epollEvent, 1, infiniteTimeout); + if (eventCount < 0) { + if (errno != EINTR) { + // TODO aconway 2006-11-28: Proper handling/logging of errors. + cerr << BOOST_CURRENT_FUNCTION << " ignoring error " + << PosixError::getMessage(errno) << endl; + assert(0); + } + } + else if (eventCount == 1) { + event = reinterpret_cast<Event*>(epollEvent.data.ptr); + assert(event != 0); + try { + event = event->complete(*handler); + } + catch (const Exception& e) { + if (event) + event->setError(e); + } + catch (const std::exception& e) { + if (event) + event->setError(e); + } + } + } + return event; +} + +Event::~Event() {} + +void Event::prepare(EventHandler& handler) +{ + handler.mqPut(this); +} + +bool Event::hasError() const { + return error; +} + +void Event::throwIfError() throw (Exception) { + if (hasError()) + error.throwSelf(); +} + +Event* Event::complete(EventHandler&) +{ + return this; +} + +void Event::dispatch() +{ + try { + if (!callback.empty()) + callback(); + } catch (const std::exception&) { + throw; + } catch (...) { + throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); + } +} + +void Event::setError(const ExceptionHolder& e) { + error = e; +} + +void ReadEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +ssize_t ReadEvent::doRead() { + ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received, + size - received); + if (n > 0) received += n; + return n; +} + +Event* ReadEvent::complete(EventHandler& handler) +{ + // Read as much as possible without blocking. + ssize_t n = doRead(); + while (n > 0 && received < size) doRead(); + + if (received == size) { + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + return this; + } + else if (n <0 && (errno == EAGAIN)) { + // Keep polling for more. + handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); + return 0; + } + else { + // Unexpected EOF or error. Throw ENODATA for EOF. + handler.epollDel(descriptor); + received = 0; // Reset for re-use. + throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); + } +} + +void WriteEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); +} + +Event* WriteEvent::complete(EventHandler& handler) +{ + ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written, + size - written); + if (n < 0) throw QPID_POSIX_ERROR(errno); + written += n; + if(written < size) { + // Keep polling. + handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); + return 0; + } + written = 0; // Reset for re-use. + handler.epollDel(descriptor); + return this; +} + +void AcceptEvent::prepare(EventHandler& handler) +{ + handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); +} + +Event* AcceptEvent::complete(EventHandler& handler) +{ + handler.epollDel(descriptor); + accepted = ::accept(descriptor, 0, 0); + if (accepted < 0) throw QPID_POSIX_ERROR(errno); + return this; +} + +}} diff --git a/cpp/src/qpid/posix/EventChannel.h b/cpp/src/qpid/posix/EventChannel.h new file mode 100644 index 0000000000..55fd2ad135 --- /dev/null +++ b/cpp/src/qpid/posix/EventChannel.h @@ -0,0 +1,176 @@ +#ifndef _sys_EventChannel_h +#define _sys_EventChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/SharedObject.h> +#include <qpid/ExceptionHolder.h> +#include <boost/function.hpp> +#include <memory> + +namespace qpid { +namespace sys { + +class Event; +class EventHandler; +class EventChannel; + +/** + * Base class for all Events. + */ +class Event +{ + public: + /** Type for callback when event is dispatched */ + typedef boost::function0<void> Callback; + + /** + * Create an event with optional callback. + * Instances of Event are sent directly through the channel. + * Derived classes define additional waiting behaviour. + *@param cb A callback functor that is invoked when dispatch() is called. + */ + Event(Callback cb = 0) : callback(cb) {} + + virtual ~Event(); + + /** Call the callback provided to the constructor, if any. */ + void dispatch(); + + /** True if there was an error processing this event */ + bool hasError() const; + + /** If hasError() throw the corresponding exception. */ + void throwIfError() throw(Exception); + + protected: + virtual void prepare(EventHandler&); + virtual Event* complete(EventHandler&); + void setError(const ExceptionHolder& e); + + Callback callback; + ExceptionHolder error; + + friend class EventChannel; + friend class EventHandler; +}; + +template <class BufT> +class IOEvent : public Event { + public: + void getDescriptor() const { return descriptor; } + size_t getSize() const { return size; } + BufT getBuffer() const { return buffer; } + + protected: + IOEvent(int fd, Callback cb, size_t sz, BufT buf) : + Event(cb), descriptor(fd), buffer(buf), size(sz) {} + + int descriptor; + BufT buffer; + size_t size; +}; + +/** Asynchronous read event */ +class ReadEvent : public IOEvent<void*> +{ + public: + explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) : + IOEvent<void*>(fd, cb, sz, buf), received(0) {} + + private: + void prepare(EventHandler&); + Event* complete(EventHandler&); + ssize_t doRead(); + + size_t received; +}; + +/** Asynchronous write event */ +class WriteEvent : public IOEvent<const void*> +{ + public: + explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0, + Callback cb=0) : + IOEvent<const void*>(fd, cb, sz, buf), written(0) {} + + protected: + void prepare(EventHandler&); + Event* complete(EventHandler&); + + private: + ssize_t doWrite(); + size_t written; +}; + +/** Asynchronous socket accept event */ +class AcceptEvent : public Event +{ + public: + /** Accept a connection on fd. */ + explicit AcceptEvent(int fd=-1, Callback cb=0) : + Event(cb), descriptor(fd), accepted(0) {} + + /** Get descriptor for server socket */ + int getAcceptedDesscriptor() const { return accepted; } + + private: + void prepare(EventHandler&); + Event* complete(EventHandler&); + + int descriptor; + int accepted; +}; + + +class QueueSet; + +/** + * Channel to post and wait for events. + */ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static shared_ptr create(); + + ~EventChannel(); + + /** Post an event to the channel. */ + void postEvent(Event& event); + + /** Post an event to the channel. Must not be 0. */ + void postEvent(Event* event) { postEvent(*event); } + + /** + * Wait for the next complete event. + *@return Pointer to event. Will never return 0. + */ + Event* getEvent(); + + private: + EventChannel(); + boost::shared_ptr<EventHandler> handler; +}; + + +}} + + + +#endif /*!_sys_EventChannel_h*/ diff --git a/cpp/src/qpid/posix/EventChannelThreads.cpp b/cpp/src/qpid/posix/EventChannelThreads.cpp new file mode 100644 index 0000000000..f97efd17e8 --- /dev/null +++ b/cpp/src/qpid/posix/EventChannelThreads.cpp @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "EventChannelThreads.h" +#include <qpid/sys/Runnable.h> +#include <iostream> +using namespace std; +#include <boost/bind.hpp> + +namespace qpid { +namespace sys { + +EventChannelThreads::shared_ptr EventChannelThreads::create( + EventChannel::shared_ptr ec) +{ + return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); +} + +EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : + channel(ec), nWaiting(0), state(RUNNING) +{ + // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. + addThread(); +} + +EventChannelThreads::~EventChannelThreads() { + shutdown(); + join(); +} + +void EventChannelThreads::shutdown() +{ + ScopedLock lock(*this); + if (state != RUNNING) // Already shutting down. + return; + for (size_t i = 0; i < workers.size(); ++i) { + channel->postEvent(terminate); + } + state = TERMINATE_SENT; + notify(); // Wake up one join() thread. +} + +void EventChannelThreads::join() +{ + { + ScopedLock lock(*this); + while (state == RUNNING) // Wait for shutdown to start. + wait(); + if (state == SHUTDOWN) // Shutdown is complete + return; + if (state == JOINING) { + // Someone else is doing the join. + while (state != SHUTDOWN) + wait(); + return; + } + // I'm the joining thread + assert(state == TERMINATE_SENT); + state = JOINING; + } // Drop the lock. + + for (size_t i = 0; i < workers.size(); ++i) { + assert(state == JOINING); // Only this thread can change JOINING. + workers[i].join(); + } + state = SHUTDOWN; + notifyAll(); // Notify other join() threaeds. +} + +void EventChannelThreads::addThread() { + ScopedLock l(*this); + workers.push_back(Thread(*this)); +} + +void EventChannelThreads::run() +{ + // Start life waiting. Decrement on exit. + AtomicCount::ScopedIncrement inc(nWaiting); + try { + while (true) { + Event* e = channel->getEvent(); + assert(e != 0); + if (e == &terminate) { + return; + } + AtomicCount::ScopedDecrement dec(nWaiting); + // I'm no longer waiting, make sure someone is. + if (dec == 0) + addThread(); + e->dispatch(); + } + } + catch (const std::exception& e) { + // TODO aconway 2006-11-15: need better logging across the board. + std::cerr << "EventChannelThreads::run() caught: " << e.what() + << std::endl; + } + catch (...) { + std::cerr << "EventChannelThreads::run() caught unknown exception." + << std::endl; + } +} + +}} diff --git a/cpp/src/qpid/posix/EventChannelThreads.h b/cpp/src/qpid/posix/EventChannelThreads.h new file mode 100644 index 0000000000..ae172ae752 --- /dev/null +++ b/cpp/src/qpid/posix/EventChannelThreads.h @@ -0,0 +1,92 @@ +#ifndef _posix_EventChannelThreads_h +#define _sys_EventChannelThreads_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <vector> + +#include <qpid/Exception.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Monitor.h> +#include <qpid/sys/Thread.h> +#include <qpid/sys/AtomicCount.h> +#include "EventChannel.h" + +namespace qpid { +namespace sys { + +/** + Dynamic thread pool serving an EventChannel. + + Threads run a loop { e = getEvent(); e->dispatch(); } + The size of the thread pool is automatically adjusted to optimal size. +*/ +class EventChannelThreads : + public qpid::SharedObject<EventChannelThreads>, + public sys::Monitor, private sys::Runnable +{ + public: + /** Create the thread pool and start initial threads. */ + static EventChannelThreads::shared_ptr create( + EventChannel::shared_ptr channel + ); + + ~EventChannelThreads(); + + /** Post event to the underlying channel */ + void postEvent(Event& event) { channel->postEvent(event); } + + /** Post event to the underlying channel Must not be 0. */ + void postEvent(Event* event) { channel->postEvent(event); } + + /** + * Terminate all threads. + * + * Returns immediately, use join() to wait till all threads are + * shut down. + */ + void shutdown(); + + /** Wait for all threads to terminate. */ + void join(); + + private: + typedef std::vector<sys::Thread> Threads; + typedef enum { + RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + } State; + + EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + void addThread(); + + void run(); + bool keepRunning(); + void adjustThreads(); + + EventChannel::shared_ptr channel; + Threads workers; + sys::AtomicCount nWaiting; + State state; + Event terminate; +}; + + +}} + + +#endif /*!_sys_EventChannelThreads_h*/ diff --git a/cpp/src/qpid/posix/Socket.cpp b/cpp/src/qpid/posix/Socket.cpp index 1321ae6b0d..6d47c64b32 100644 --- a/cpp/src/qpid/posix/Socket.cpp +++ b/cpp/src/qpid/posix/Socket.cpp @@ -20,6 +20,7 @@ */ #include <sys/socket.h> +#include <sys/errno.h> #include <netinet/in.h> #include <netdb.h> @@ -31,60 +32,87 @@ using namespace qpid::sys; -Socket::Socket() : socket(::socket (PF_INET, SOCK_STREAM, 0)) +Socket Socket::createTcp() { - CHECKNN(socket == 0); + int s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s < 0) throw QPID_POSIX_ERROR(errno); + return s; } -void -Socket::setTimeout(long msecs) +Socket::Socket(int descriptor) : socket(descriptor) {} + +void Socket::setTimeout(Time interval) { struct timeval tv; - tv.tv_sec = msecs / 1000; - tv.tv_usec = (msecs % 1000)*1000; + tv.tv_sec = interval/TIME_SEC; + tv.tv_usec = (interval%TIME_SEC)/TIME_USEC; setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); } -void -Socket::connect(const std::string& host, int port) +void Socket::connect(const std::string& host, int port) { struct sockaddr_in name; name.sin_family = AF_INET; name.sin_port = htons(port); struct hostent* hp = gethostbyname ( host.c_str() ); - if (hp == 0) CHECK0(-1); // TODO aconway 2006-11-09: error message? + if (hp == 0) throw QPID_POSIX_ERROR(errno); memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); - CHECK0(::connect(socket, (struct sockaddr*)(&name), sizeof(name))); + if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) + throw QPID_POSIX_ERROR(errno); } void Socket::close() { if (socket == 0) return; - CHECK0(::close(socket)); + if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); socket = 0; } ssize_t -Socket::send(const char* data, size_t size) +Socket::send(const void* data, size_t size) { ssize_t sent = ::send(socket, data, size, 0); if (sent < 0) { if (errno == ECONNRESET) return SOCKET_EOF; if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; - CHECK0(sent); + throw QPID_POSIX_ERROR(errno); } return sent; } ssize_t -Socket::recv(char* data, size_t size) +Socket::recv(void* data, size_t size) { ssize_t received = ::recv(socket, data, size, 0); if (received < 0) { if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; - CHECK0(received); + throw QPID_POSIX_ERROR(errno); } return received; } + +int Socket::listen(int port, int backlog) +{ + struct sockaddr_in name; + name.sin_family = AF_INET; + name.sin_port = htons(port); + name.sin_addr.s_addr = 0; + if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) + throw QPID_POSIX_ERROR(errno); + if (::listen(socket, backlog) < 0) + throw QPID_POSIX_ERROR(errno); + + socklen_t namelen = sizeof(name); + if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) + throw QPID_POSIX_ERROR(errno); + + return ntohs(name.sin_port); +} + + +int Socket::fd() +{ + return socket; +} diff --git a/cpp/src/qpid/posix/check.cpp b/cpp/src/qpid/posix/check.cpp index 2ef52f68b7..408679caa8 100644 --- a/cpp/src/qpid/posix/check.cpp +++ b/cpp/src/qpid/posix/check.cpp @@ -19,15 +19,21 @@ * */ -#include <qpid/QpidError.h> +#include <cerrno> #include "check.h" namespace qpid { namespace sys { -std::string errnoToString() { +std::string +PosixError::getMessage(int errNo) +{ char buf[512]; - return strerror_r(errno, buf, sizeof(buf)); + return std::string(strerror_r(errNo, buf, sizeof(buf))); } +PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() + : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) +{ } + }} diff --git a/cpp/src/qpid/posix/check.h b/cpp/src/qpid/posix/check.h index 666637b1c2..2d3a8d30e3 100644 --- a/cpp/src/qpid/posix/check.h +++ b/cpp/src/qpid/posix/check.h @@ -22,18 +22,41 @@ * */ -#include <errno.h> +#include <cerrno> #include <string> #include <qpid/QpidError.h> namespace qpid { namespace sys { -std::string errnoToString(); +/** + * Exception with message from errno. + */ +class PosixError : public qpid::QpidError +{ + public: + static std::string getMessage(int errNo); + + PosixError(int errNo, const qpid::SrcLine& location) throw(); + + ~PosixError() throw() {} + + int getErrNo() { return errNo; } + + Exception* clone() const throw() { return new PosixError(*this); } + + void throwSelf() { throw *this; } + + private: + int errNo; +}; -#define CHECK0(N) if ((N)!=0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString()) -#define CHECKNN(N) if ((N)<0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString()) }} +/** Create a PosixError for the current file/line and errno. */ +#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +/** Throw a posix error if errNo is non-zero */ +#define QPID_POSIX_THROW_IF(ERRNO) \ + if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO)) #endif /*!_posix_check_h*/ diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h new file mode 100644 index 0000000000..b625b2c9b0 --- /dev/null +++ b/cpp/src/qpid/sys/AtomicCount.h @@ -0,0 +1,71 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount : boost::noncopyable { + public: + class ScopedDecrement : boost::noncopyable { + public: + /** Decrement counter in constructor and increment in destructor. */ + ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + /** Return the value returned by the decrement. */ + operator long() { return value; } + private: + AtomicCount& count; + long value; + }; + + class ScopedIncrement : boost::noncopyable { + public: + /** Increment counter in constructor and increment in destructor. */ + ScopedIncrement(AtomicCount& c) : count(c) { ++count; } + ~ScopedIncrement() { --count; } + private: + AtomicCount& count; + }; + + AtomicCount(long value = 0) : count(value) {} + + void operator++() { ++count ; } + + long operator--() { return --count; } + + operator long() const { return count; } + + + private: + boost::detail::atomic_count count; +}; + + +}} + + +#endif // _posix_AtomicCount_h diff --git a/cpp/src/qpid/sys/EventChannel.h b/cpp/src/qpid/sys/EventChannel.h deleted file mode 100644 index dd857c02c7..0000000000 --- a/cpp/src/qpid/sys/EventChannel.h +++ /dev/null @@ -1,239 +0,0 @@ -#ifndef _sys_EventChannel_h -#define _sys_EventChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <qpid/SharedObject.h> -#include <qpid/Exception.h> -#include <qpid/sys/Time.h> - -namespace qpid { -namespace sys { - -class EventChannel; - -class Event; -class ReadEvent; -class WriteEvent; -class AcceptEvent; -class NotifyEvent; - -/** - Active event channel. Events represent async IO requests or - inter-task synchronization. Posting an Event registers interest in - the IO or sync event. When it occurs the posted Event is - corresponding IO or sync event occurs they are returned to one - of the threads waiting on the channel. For more details see - the Event hierarchy. -*/ -class EventChannel : public qpid::SharedObject<EventChannel> -{ - public: - static EventChannel::shared_ptr create(); - - virtual ~EventChannel() {} - - virtual void post(ReadEvent& event) = 0; - virtual void post(WriteEvent& event) = 0; - virtual void post(AcceptEvent& event) = 0; - virtual void post(NotifyEvent& event) = 0; - - inline void post(Event& event); - - /** - * Wait for the next completed event. - * @return An Event or 0 to indicate the calling thread should shut down. - */ - virtual Event* getEvent() = 0; - - /** Dispose of a system-allocated buffer. Called by ReadEvent */ - virtual void dispose(void* buffer, size_t size) = 0; - - protected: - EventChannel() {} -}; - - -/** - * Base class for all events. There are two possible styles of use: - * - * Task style: the event is allocated as a local variable on the initiating - * task, which blocks in wait(). Event::dispatch() resumes that task - * with the event data available. - * - * Proactor style: Threads post events but do not - * wait. Event::dispatch() processes the event in the dispatching - * thread and then deletes itself. - * - * Tasks give less kernel context switching and blocking AND simpler - * coding. Tasks can call any number of pseudo-blocking opereations - * that are actually event post/wait pairs. At each such point the - * current thread can continue with the task or switch to another task - * to minimise blocking. - * - * With Proactor style dispatch() is an atomic unit of work as far as - * the EventChannel is concerned. To avoid such blocking the - * application has to be written as a collection of non-blocking - * dispatch() callbacks, which is more complex than tasks that can - * call pseudo-blocking operations. - */ -class Event : private boost::noncopyable -{ - public: - virtual ~Event() {} - - /** Post this event to the channel */ - virtual void post(EventChannel& channel) = 0; - - /** - * Block till the event is delivered. - * At most one task can wait on an event. - */ - virtual void wait() const = 0; - - /** - * Dispatch the event. Runs some event-specific code, may switch - * context to resume a waiting task. - */ - virtual void dispatch() = 0; -}; - - -/** - * Base class for asynchronous request events, provides exception - * handling. - */ -class RequestEvent : public Event -{ - public: - /** True if the async request failed */ - bool hasException() const { return ex.get(); } - - const qpid::Exception& getException() const { return *ex; } - - void setException(std::auto_ptr<qpid::Exception>& e) { ex = e; } - - /** If the event has an exception throw it, else do nothing */ - void verify() const { if (ex.get()) throw *ex; } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - qpid::HeapException ex; -}; - - -/** An asynchronous read event. */ -class ReadEvent : public RequestEvent { - public: - /** - * Read data from fd. - */ - ReadEvent(int fileDescriptor, void* buffer, size_t bytesToRead) : - fd(fileDescriptor), data(buffer), size(bytesToRead) {} - - /** Number of bytes read. */ - size_t getBytesRead() const { verify(); return size; } - - /** - * If the system supports direct access to DMA buffers then - * it may provide a direct pointer to such a buffer to avoid - * a copy into the user buffer. - * @return true if getData() is returning a system-supplied buffer. - */ - bool isSystemData() const { verify(); return channel != 0; } - - /** - * Pointer to data read. Note if isSystemData() is true then this - * is NOT the same buffer that was supplied to the constructor. - * The user buffer is untouched. See dispose(). - */ - void* getData() const { verify(); return data; } - - /** Called by the event channel on completion. */ - void complete(EventChannel::shared_ptr ec, void* _data, size_t _size) { - if (data != _data) channel = ec; data = _data; size = _size; - } - - /** - * Dispose of system-provided data buffer, if any. This is - * automatically called by the destructor. - */ - void dispose() { if(channel && data) channel->dispose(data,size); data=0; } - - ~ReadEvent() { dispose(); } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - int fd; - void* data; - size_t size; - EventChannel::shared_ptr channel; -}; - -/** Asynchronous write event */ -class WriteEvent : public RequestEvent { - public: - WriteEvent(int fileDescriptor, void* buffer, size_t bytesToWrite) : - fd(fileDescriptor), data(buffer), size(bytesToWrite) {} - - /** Number of bytes written */ - size_t getBytesWritten() const { verify(); return size; } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - int fd; - void* data; - size_t size; -}; - -/** Asynchronous socket accept event */ -class AcceptEvent : public RequestEvent { - public: - /** Accept a connection on listeningFd */ - AcceptEvent(int listeningFd) : listen(listeningFd) {} - - /** Get accepted file descriptor */ - int getAcceptedFd() const { verify(); return accepted; } - - void post(EventChannel& channel) { channel.post(*this); } - - private: - int listen; - int accepted; -}; - -/** - * NotifyEvent is delievered immediately to be dispatched by an - * EventChannel thread. - */ -class NotifyEvent : public RequestEvent { - public: - void post(EventChannel& channel) { channel.post(*this); } -}; - - -inline void EventChannel::post(Event& event) { event.post(*this); } - -}} - - -#endif /*!_sys_EventChannel_h*/ diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h index 59e1e74b57..bbe126cecb 100644 --- a/cpp/src/qpid/sys/Monitor.h +++ b/cpp/src/qpid/sys/Monitor.h @@ -22,60 +22,28 @@ * */ +#include <sys/errno.h> #include <boost/noncopyable.hpp> +#include <qpid/sys/Mutex.h> +#include <qpid/sys/Time.h> #ifdef USE_APR -# include <apr_thread_mutex.h> -# include <apr_thread_cond.h> -# include <qpid/apr/APRBase.h> -# include <qpid/apr/APRPool.h> -#else -# include <pthread.h> -# include <qpid/sys/Time.h> -# include <qpid/posix/check.h> +# include <apr-1/apr_thread_cond.h> #endif namespace qpid { namespace sys { -template <class L> -class ScopedLock -{ - public: - ScopedLock(L& l) : mutex(l) { l.lock(); } - ~ScopedLock() { mutex.unlock(); } - private: - L& mutex; -}; - - -class Mutex : private boost::noncopyable -{ - public: - typedef ScopedLock<Mutex> ScopedLock; - - inline Mutex(); - inline ~Mutex(); - inline void lock(); - inline void unlock(); - inline void trylock(); - - protected: -#ifdef USE_APR - apr_thread_mutex_t* mutex; -#else - pthread_mutex_t mutex; -#endif -}; - -/** A condition variable and a mutex */ +/** + * A monitor is a condition variable and a mutex + */ class Monitor : public Mutex { public: inline Monitor(); inline ~Monitor(); inline void wait(); - inline bool wait(int64_t nsecs); + inline bool wait(const Time& absoluteTime); inline void notify(); inline void notifyAll(); @@ -91,25 +59,6 @@ class Monitor : public Mutex // APR ================================================================ #ifdef USE_APR -Mutex::Mutex() { - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); -} - -void Mutex::lock() { - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} -void Mutex::unlock() { - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} - -void Mutex::trylock() { - CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); -} - Monitor::Monitor() { CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); } @@ -122,10 +71,10 @@ void Monitor::wait() { CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); } -bool Monitor::wait(int64_t nsecs){ +bool Monitor::wait(const Time& absoluteTime){ // APR uses microseconds. - apr_status_t status = apr_thread_cond_timedwait( - condition, mutex, nsecs/1000); + apr_status_t status = + apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC); if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status); return status == 0; } @@ -138,93 +87,41 @@ void Monitor::notifyAll(){ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); } - -}} - - -// POSIX ================================================================ #else -/** - * PODMutex is a POD, can be static-initialized with - * PODMutex m = QPID_PODMUTEX_INITIALIZER - */ -struct PODMutex -{ - typedef ScopedLock<PODMutex> ScopedLock; - - inline void lock(); - inline void unlock(); - inline void trylock(); - - // Must be public to be a POD: - pthread_mutex_t mutex; -}; - -#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } - - -void PODMutex::lock() { - CHECK0(pthread_mutex_lock(&mutex)); -} -void PODMutex::unlock() { - CHECK0(pthread_mutex_unlock(&mutex)); -} - -void PODMutex::trylock() { - CHECK0(pthread_mutex_trylock(&mutex)); -} - - -Mutex::Mutex() { - CHECK0(pthread_mutex_init(&mutex, 0)); -} - -Mutex::~Mutex(){ - CHECK0(pthread_mutex_destroy(&mutex)); -} - -void Mutex::lock() { - CHECK0(pthread_mutex_lock(&mutex)); -} -void Mutex::unlock() { - CHECK0(pthread_mutex_unlock(&mutex)); -} - -void Mutex::trylock() { - CHECK0(pthread_mutex_trylock(&mutex)); -} +// POSIX ================================================================ Monitor::Monitor() { - CHECK0(pthread_cond_init(&condition, 0)); + QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0)); } Monitor::~Monitor() { - CHECK0(pthread_cond_destroy(&condition)); + QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition)); } void Monitor::wait() { - CHECK0(pthread_cond_wait(&condition, &mutex)); + QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex)); } -bool Monitor::wait(int64_t nsecs){ - Time t(nsecs); - int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec()); - if(status != 0) { - if (errno == ETIMEDOUT) return false; - CHECK0(status); +bool Monitor::wait(const Time& absoluteTime){ + struct timespec ts; + toTimespec(ts, absoluteTime); + int status = pthread_cond_timedwait(&condition, &mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); } return true; } void Monitor::notify(){ - CHECK0(pthread_cond_signal(&condition)); + QPID_POSIX_THROW_IF(pthread_cond_signal(&condition)); } void Monitor::notifyAll(){ - CHECK0(pthread_cond_broadcast(&condition)); + QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition)); } +#endif /*USE_APR*/ }} -#endif /*USE_APR*/ #endif /*!_sys_Monitor_h*/ diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h new file mode 100644 index 0000000000..3ada2e98b7 --- /dev/null +++ b/cpp/src/qpid/sys/Mutex.h @@ -0,0 +1,151 @@ +#ifndef _sys_Mutex_h +#define _sys_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifdef USE_APR +# include <apr-1/apr_thread_mutex.h> +# include <qpid/apr/APRBase.h> +# include <qpid/apr/APRPool.h> +#else +# include <pthread.h> +# include <qpid/posix/check.h> +#endif +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * Scoped lock template: calls lock() in ctor, unlock() in dtor. + * L can be any class with lock() and unlock() functions. + */ +template <class L> +class ScopedLock +{ + public: + ScopedLock(L& l) : mutex(l) { l.lock(); } + ~ScopedLock() { mutex.unlock(); } + private: + L& mutex; +}; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + public: + typedef ScopedLock<Mutex> ScopedLock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); + + protected: +#ifdef USE_APR + apr_thread_mutex_t* mutex; +#else + pthread_mutex_t mutex; +#endif +}; + +#ifdef USE_APR +// APR ================================================================ + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +#else +// POSIX ================================================================ + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline void trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + + +void PODMutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void PODMutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void PODMutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + + +Mutex::Mutex() { + QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0)); +} + +Mutex::~Mutex(){ + QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex)); +} +void Mutex::unlock() { + QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +void Mutex::trylock() { + QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex)); +} + +#endif // USE_APR + +}} + + + +#endif /*!_sys_Mutex_h*/ diff --git a/cpp/src/qpid/posix/EpollEventChannel.h b/cpp/src/qpid/sys/Runnable.cpp index 8128d5276f..30122c682f 100644 --- a/cpp/src/qpid/posix/EpollEventChannel.h +++ b/cpp/src/qpid/sys/Runnable.cpp @@ -16,33 +16,17 @@ * */ -#include <qpid/sys/EventChannel.h> +#include "Runnable.h" +#include <boost/bind.hpp> namespace qpid { namespace sys { -/** Epoll-based implementation of the event channel */ -class EpollEventChannel : public EventChannel -{ - public: - - EpollEventChannel(); - ~EpollEventChannel(); - - virtual void post(ReadEvent& event); - virtual void post(WriteEvent& event); - virtual void post(AcceptEvent& event); - virtual void post(NotifyEvent& event); - - inline void post(Event& event); +Runnable::~Runnable() {} - virtual Event* getEvent(); - - virtual void dispose(void* buffer, size_t size); - - private: - int epollFd; - -}; +Runnable::Functor Runnable::functor() +{ + return boost::bind(&Runnable::run, this); +} }} diff --git a/cpp/src/qpid/sys/Runnable.h b/cpp/src/qpid/sys/Runnable.h index 8379afb2f9..fb3927c612 100644 --- a/cpp/src/qpid/sys/Runnable.h +++ b/cpp/src/qpid/sys/Runnable.h @@ -1,3 +1,5 @@ +#ifndef _Runnable_ +#define _Runnable_ /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,18 +20,28 @@ * under the License. * */ -#ifndef _Runnable_ -#define _Runnable_ + +#include <boost/function.hpp> namespace qpid { namespace sys { -/** Base class for classes that run in a thread. */ +/** + * Interface for objects that can be run, e.g. in a thread. + */ class Runnable { public: - virtual ~Runnable() {} + /** Type to represent a runnable as a Functor */ + typedef boost::function0<void> Functor; + + virtual ~Runnable(); + + /** Derived classes override run(). */ virtual void run() = 0; + + /** Create a functor object that will call this->run(). */ + Functor functor(); }; }} diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index cf757e7a27..d3e8c1af48 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -23,6 +23,7 @@ */ #include <string> +#include <qpid/sys/Time.h> #ifdef USE_APR # include <apr_network_io.h> @@ -34,10 +35,18 @@ namespace sys { class Socket { public: - Socket(); + /** Create an initialized TCP socket */ + static Socket createTcp(); + /** Create a socket wrapper for descriptor. */ +#ifdef USE_APR + Socket(apr_socket_t* descriptor = 0); +#else + Socket(int descriptor = 0); +#endif + /** Set timeout for read and write */ - void setTimeout(long msecs); + void setTimeout(Time interval); void connect(const std::string& host, int port); @@ -46,19 +55,30 @@ class Socket enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode; /** Returns bytes sent or an ErrorCode value < 0. */ - ssize_t send(const char* data, size_t size); + ssize_t send(const void* data, size_t size); /** * Returns bytes received, an ErrorCode value < 0 or 0 * if the connection closed in an orderly manner. */ - ssize_t recv(char* data, size_t size); + ssize_t recv(void* data, size_t size); + + /** Bind to a port and start listening. + *@param port 0 means choose an available port. + *@param backlog maximum number of pending connections. + *@return The bound port. + */ + int listen(int port = 0, int backlog = 10); + /** Get file descriptor */ + int fd(); + private: #ifdef USE_APR apr_socket_t* socket; #else - int socket; + void init() const; + mutable int socket; // Initialized on demand. #endif }; diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h index 2aad7c24d7..37f714dd6c 100644 --- a/cpp/src/qpid/sys/Thread.h +++ b/cpp/src/qpid/sys/Thread.h @@ -40,11 +40,17 @@ namespace sys { class Thread { public: + inline static Thread current(); + inline static void yield(); + inline Thread(); inline explicit Thread(qpid::sys::Runnable*); + inline explicit Thread(qpid::sys::Runnable&); + inline void join(); - inline static Thread current(); + inline long id(); + private: #ifdef USE_APR static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data); @@ -68,12 +74,21 @@ Thread::Thread(Runnable* runnable) { apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get())); } +Thread::Thread(Runnable& runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get())); +} + void Thread::join(){ apr_status_t status; if (thread != 0) CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); } +long Thread::id() { + return long(thread); +} + Thread::Thread(apr_thread_t* t) : thread(t) {} Thread Thread::current(){ @@ -83,15 +98,29 @@ Thread Thread::current(){ return Thread(thr); } +void Thread::yield() +{ + apr_thread_yield(); +} + + // POSIX ================================================================ #else Thread::Thread(Runnable* runnable) { - CHECK0(pthread_create(&thread, NULL, runRunnable, runnable)); + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable)); +} + +Thread::Thread(Runnable& runnable) { + QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable)); } void Thread::join(){ - if (thread != 0) CHECK0(pthread_join(thread, 0)); + QPID_POSIX_THROW_IF(pthread_join(thread, 0)); +} + +long Thread::id() { + return long(thread); } Thread::Thread(pthread_t thr) : thread(thr) {} @@ -99,6 +128,13 @@ Thread::Thread(pthread_t thr) : thread(thr) {} Thread Thread::current() { return Thread(pthread_self()); } + +void Thread::yield() +{ + QPID_POSIX_THROW_IF(pthread_yield()); +} + + #endif }} diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp index 3971297ec2..ad6185b966 100644 --- a/cpp/src/qpid/sys/Time.cpp +++ b/cpp/src/qpid/sys/Time.cpp @@ -27,37 +27,34 @@ namespace sys { // APR ================================================================ #if USE_APR -Time Time::now() { - return Time(apr_time_now(), NSEC_PER_USEC); -} - -void Time::set(int64_t ticks, long nsec_per_tick) { - time = (ticks * nsec_per_tick) / NSEC_PER_USEC; -} - -int64_t Time::nsecs() const { - return time * NSEC_PER_USEC; -} +Time now() { return apr_time_now() * TIME_USEC; } // POSIX================================================================ #else -Time Time::now() { - Time t; - clock_gettime(CLOCK_REALTIME, &t.time); - return t; +Time now() { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return toTime(ts); +} + +struct timespec toTimespec(const Time& t) { + struct timespec ts; + toTimespec(ts, t); + return ts; } -void Time::set(int64_t ticks, long nsec_per_tick) { - int64_t ns = ticks * nsec_per_tick; - time.tv_sec = ns / NSEC_PER_SEC; - time.tv_nsec = ns % NSEC_PER_SEC; +struct timespec& toTimespec(struct timespec& ts, const Time& t) { + ts.tv_sec = t / TIME_SEC; + ts.tv_nsec = t % TIME_SEC; + return ts; } -int64_t Time::nsecs() const { - return time.tv_sec * NSEC_PER_SEC + time.tv_nsec; +Time toTime(const struct timespec& ts) { + return ts.tv_sec*TIME_SEC + ts.tv_nsec; } + #endif }} diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index a569c90780..3dd46741d8 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -33,43 +33,25 @@ namespace qpid { namespace sys { -class Time -{ - public: - static Time now(); - - enum { - NSEC_PER_SEC=1000*1000*1000, - NSEC_PER_MSEC=1000*1000, - NSEC_PER_USEC=1000 - }; +/** Time in nanoseconds */ +typedef int64_t Time; - inline Time(int64_t ticks=0, long nsec_per_tick=1); +Time now(); - void set(int64_t ticks, long nsec_per_tick=1); - - inline int64_t msecs() const; - inline int64_t usecs() const; - int64_t nsecs() const; +/** Nanoseconds per second. */ +const Time TIME_SEC = 1000*1000*1000; +/** Nanoseconds per millisecond */ +const Time TIME_MSEC = 1000*1000; +/** Nanoseconds per microseconds. */ +const Time TIME_USEC = 1000; +/** Nanoseconds per nanosecond. */ +const Time TIME_NSEC = 1; #ifndef USE_APR - const struct timespec& getTimespec() const { return time; } - struct timespec& getTimespec() { return time; } +struct timespec toTimespec(const Time& t); +struct timespec& toTimespec(struct timespec& ts, const Time& t); +Time toTime(const struct timespec& ts); #endif - - private: -#ifdef USE_APR - apr_time_t time; -#else - struct timespec time; -#endif -}; - -Time::Time(int64_t ticks, long nsec_per_tick) { set(ticks, nsec_per_tick); } - -int64_t Time::msecs() const { return nsecs() / NSEC_PER_MSEC; } - -int64_t Time::usecs() const { return nsecs() / NSEC_PER_USEC; } }} diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp index 18b162ec8a..0a3c300f4a 100644 --- a/cpp/test/client/client_test.cpp +++ b/cpp/test/client/client_test.cpp @@ -90,7 +90,9 @@ int main(int argc, char**) con.close(); std::cout << "Closed connection." << std::endl; }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << "Error [" << error.code << "] " << error.msg << " (" + << error.location.file << ":" << error.location.line + << ")" << std::endl; return 1; } return 0; diff --git a/cpp/test/client/echo_service.cpp b/cpp/test/client/echo_service.cpp index f0aa49fd4b..3df3da0b86 100644 --- a/cpp/test/client/echo_service.cpp +++ b/cpp/test/client/echo_service.cpp @@ -107,7 +107,7 @@ int main(int argc, char** argv){ connection.close(); } catch(qpid::QpidError error) { - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } else { try { @@ -133,7 +133,7 @@ int main(int argc, char** argv){ connection.close(); } catch(qpid::QpidError error) { - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } } diff --git a/cpp/test/client/topic_listener.cpp b/cpp/test/client/topic_listener.cpp index 413d482361..bd7cfdc62c 100644 --- a/cpp/test/client/topic_listener.cpp +++ b/cpp/test/client/topic_listener.cpp @@ -38,7 +38,7 @@ class Listener : public MessageListener{ const bool transactional; bool init; int count; - int64_t start; + Time start; void shutdown(); void report(); @@ -96,7 +96,7 @@ int main(int argc, char** argv){ channel.run(); connection.close(); }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } } @@ -106,7 +106,7 @@ Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) : void Listener::received(Message& message){ if(!init){ - start = Time::now().msecs(); + start = now(); count = 0; init = true; } @@ -128,10 +128,11 @@ void Listener::shutdown(){ } void Listener::report(){ - int64_t finish = Time::now().msecs(); - int64_t time = finish - start; + Time finish = now(); + Time time = finish - start; std::stringstream reportstr; - reportstr << "Received " << count << " messages in " << time << " ms."; + reportstr << "Received " << count << " messages in " + << time/TIME_MSEC << " ms."; Message msg; msg.setData(reportstr.str()); channel->publish(msg, string(), responseQueue); diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp index d9f271e2f0..97d589c1d1 100644 --- a/cpp/test/client/topic_publisher.cpp +++ b/cpp/test/client/topic_publisher.cpp @@ -114,11 +114,13 @@ int main(int argc, char** argv){ int64_t sum(0); for(int i = 0; i < batchSize; i++){ if(i > 0 && args.getDelay()) sleep(args.getDelay()); - int64_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize()); + Time time = publisher.publish( + args.getMessages(), args.getSubscribers(), args.getSize()); if(!max || time > max) max = time; if(!min || time < min) min = time; sum += time; - std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl; + std::cout << "Completed " << (i+1) << " of " << batchSize + << " in " << time/TIME_MSEC << "ms" << std::endl; } publisher.terminate(); int64_t avg = sum / batchSize; @@ -129,7 +131,7 @@ int main(int argc, char** argv){ channel.close(); connection.close(); }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + std::cout << error.what() << std::endl; } } } @@ -153,7 +155,7 @@ void Publisher::waitForCompletion(int msgs){ int64_t Publisher::publish(int msgs, int listeners, int size){ Message msg; msg.setData(generateData(size)); - int64_t start = Time::now().msecs(); + Time start = now(); { Monitor::ScopedLock l(monitor); for(int i = 0; i < msgs; i++){ @@ -170,7 +172,7 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ waitForCompletion(listeners); } - int64_t finish(Time::now().msecs()); + Time finish = now(); return finish - start; } diff --git a/cpp/test/unit/qpid/ExceptionTest.cpp b/cpp/test/unit/qpid/ExceptionTest.cpp new file mode 100644 index 0000000000..7c3261dc29 --- /dev/null +++ b/cpp/test/unit/qpid/ExceptionTest.cpp @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <qpid/Exception.h> +#include <qpid_test_plugin.h> + +using namespace qpid; + +struct CountDestroyedException : public Exception { + int& count; + static int staticCount; + CountDestroyedException() : count(staticCount) { } + CountDestroyedException(int& n) : count(n) {} + ~CountDestroyedException() throw() { count++; } + void throwSelf() const { throw *this; } +}; + +int CountDestroyedException::staticCount = 0; + + +class ExceptionTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ExceptionTest); + CPPUNIT_TEST(testHeapException); + CPPUNIT_TEST_SUITE_END(); + public: + // Verify proper memory management for heap-allocated exceptions. + void testHeapException() { + int count = 0; + try { + std::auto_ptr<Exception> p( + new CountDestroyedException(count)); + p.release()->throwSelf(); + CPPUNIT_FAIL("Expected CountDestroyedException."); + } catch (const CountDestroyedException& e) { + CPPUNIT_ASSERT(&e.count == &count); + } + CPPUNIT_ASSERT_EQUAL(1, count); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ExceptionTest); + diff --git a/cpp/test/unit/qpid/posix/EventChannelTest.cpp b/cpp/test/unit/qpid/posix/EventChannelTest.cpp new file mode 100644 index 0000000000..8846a0e340 --- /dev/null +++ b/cpp/test/unit/qpid/posix/EventChannelTest.cpp @@ -0,0 +1,187 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/posix/EventChannel.h> +#include <qpid/posix/check.h> +#include <qpid/sys/Runnable.h> +#include <qpid/sys/Socket.h> +#include <qpid/sys/Thread.h> +#include <qpid_test_plugin.h> + +#include <sys/socket.h> +#include <signal.h> +#include <netinet/in.h> +#include <netdb.h> +#include <iostream> + +using namespace qpid::sys; + + +const char hello[] = "hello"; +const size_t size = sizeof(hello); + +struct RunMe : public Runnable +{ + bool ran; + RunMe() : ran(false) {} + void run() { ran = true; } +}; + +class EventChannelTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelTest); + CPPUNIT_TEST(testEvent); + CPPUNIT_TEST(testRead); + CPPUNIT_TEST(testFailedRead); + CPPUNIT_TEST(testWrite); + CPPUNIT_TEST(testFailedWrite); + CPPUNIT_TEST(testReadWrite); + CPPUNIT_TEST(testAccept); + CPPUNIT_TEST_SUITE_END(); + + private: + EventChannel::shared_ptr ec; + int pipe[2]; + char readBuf[size]; + + public: + + void setUp() + { + memset(readBuf, size, 0); + ec = EventChannel::create(); + if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno); + // Ignore SIGPIPE, otherwise we will crash writing to broken pipe. + signal(SIGPIPE, SIG_IGN); + } + + // Verify that calling getEvent returns event. + template <class T> bool isNextEvent(T& event) + { + return &event == dynamic_cast<T*>(ec->getEvent()); + } + + template <class T> bool isNextEventOk(T& event) + { + Event* next = ec->getEvent(); + if (next) next->throwIfError(); + return &event == next; + } + + void testEvent() + { + RunMe runMe; + CPPUNIT_ASSERT(!runMe.ran); + // Instances of Event just pass thru the channel immediately. + Event e(runMe.functor()); + ec->postEvent(e); + CPPUNIT_ASSERT(isNextEventOk(e)); + e.dispatch(); + CPPUNIT_ASSERT(runMe.ran); + } + + void testRead() { + ReadEvent re(pipe[0], readBuf, size); + ec->postEvent(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size)); + CPPUNIT_ASSERT(isNextEventOk(re)); + CPPUNIT_ASSERT_EQUAL(size, re.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testFailedRead() + { + ReadEvent re(pipe[0], readBuf, size); + ec->postEvent(re); + + // EOF before all data read. + ::close(pipe[1]); + CPPUNIT_ASSERT(isNextEvent(re)); + CPPUNIT_ASSERT(re.hasError()); + try { + re.throwIfError(); + CPPUNIT_FAIL("Expected QpidError."); + } + catch (const qpid::QpidError&) { } + + // Bad file descriptor. Note in this case we fail + // in postEvent and throw immediately. + try { + ReadEvent bad; + ec->postEvent(bad); + CPPUNIT_FAIL("Expected QpidError."); + } + catch (const qpid::QpidError&) { } + } + + void testWrite() { + WriteEvent wr(pipe[1], hello, size); + ec->postEvent(wr); + CPPUNIT_ASSERT(isNextEventOk(wr)); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));; + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testFailedWrite() { + WriteEvent wr(pipe[1], hello, size); + ::close(pipe[0]); + ec->postEvent(wr); + CPPUNIT_ASSERT(isNextEvent(wr)); + CPPUNIT_ASSERT(wr.hasError()); + } + + void testReadWrite() + { + ReadEvent re(pipe[0], readBuf, size); + WriteEvent wr(pipe[1], hello, size); + ec->postEvent(re); + ec->postEvent(wr); + ec->getEvent(); + ec->getEvent(); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } + + void testAccept() { + Socket s = Socket::createTcp(); + int port = s.listen(0, 10); + CPPUNIT_ASSERT(port != 0); + + AcceptEvent ae(s.fd()); + ec->postEvent(ae); + Socket client = Socket::createTcp(); + client.connect("localhost", port); + CPPUNIT_ASSERT(isNextEvent(ae)); + ae.dispatch(); + + // Verify client writes are read by the accepted descriptor. + char readBuf[size]; + ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size); + ec->postEvent(re); + CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello))); + CPPUNIT_ASSERT(isNextEvent(re)); + re.dispatch(); + CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf)); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest); + diff --git a/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp new file mode 100644 index 0000000000..5c467880be --- /dev/null +++ b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <boost/bind.hpp> + +#include <qpid/sys/Socket.h> +#include <qpid/posix/EventChannelThreads.h> +#include <qpid_test_plugin.h> + + +using namespace std; + +using namespace qpid::sys; + +const int nConnections = 5; +const int nMessages = 10; // Messages read/written per connection. + + +// Accepts + reads + writes. +const int totalEvents = nConnections+2*nConnections*nMessages; + +/** + * Messages are numbered 0..nMessages. + * We count the total number of events, and the + * number of reads and writes for each message number. + */ +class TestResults : public Monitor { + public: + TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {} + + void countEvent() { + if (--nEventsRemaining == 0) + shutdown(); + } + + void countRead(int messageNo) { + ++reads[messageNo]; + countEvent(); + } + + void countWrite(int messageNo) { + ++writes[messageNo]; + countEvent(); + } + + void shutdown(const std::string& exceptionMsg = std::string()) { + ScopedLock lock(*this); + exception = exceptionMsg; + isShutdown = true; + notifyAll(); + } + + void wait() { + ScopedLock lock(*this); + Time deadline = now() + 10*TIME_SEC; + while (!isShutdown) { + CPPUNIT_ASSERT(Monitor::wait(deadline)); + } + } + + bool isShutdown; + std::string exception; + AtomicCount reads[nMessages]; + AtomicCount writes[nMessages]; + AtomicCount nEventsRemaining; +}; + +TestResults results; + +EventChannelThreads::shared_ptr threads; + +// Functor to wrap callbacks in try/catch. +class SafeCallback { + public: + SafeCallback(Runnable& r) : callback(r.functor()) {} + SafeCallback(Event::Callback cb) : callback(cb) {} + + void operator()() { + std::string exception; + try { + callback(); + return; + } + catch (const std::exception& e) { + exception = e.what(); + } + catch (...) { + exception = "Unknown exception."; + } + results.shutdown(exception); + } + + private: + Event::Callback callback; +}; + +/** Repost an event N times. */ +class Repost { + public: + Repost(int n) : count (n) {} + virtual ~Repost() {} + + void repost(Event* event) { + if (--count==0) { + delete event; + } else { + threads->postEvent(event); + } + } + private: + int count; +}; + + + +/** Repeating read event. */ +class TestReadEvent : public ReadEvent, public Runnable, private Repost { + public: + explicit TestReadEvent(int fd=-1) : + ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)), + Repost(nMessages) + {} + + void run() { + CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize()); + CPPUNIT_ASSERT(0 <= value); + CPPUNIT_ASSERT(value < nMessages); + results.countRead(value); + repost(this); + } + + private: + int value; + ReadEvent original; +}; + + +/** Fire and forget write event */ +class TestWriteEvent : public WriteEvent, public Runnable, private Repost { + public: + TestWriteEvent(int fd=-1) : + WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)), + Repost(nMessages), + value(0) + {} + + void run() { + CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize()); + results.countWrite(value++); + repost(this); + } + + private: + int value; +}; + +/** Fire-and-forget Accept event, posts reads on the accepted connection. */ +class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost { + public: + TestAcceptEvent(int fd=-1) : + AcceptEvent(fd, SafeCallback(*this)), + Repost(nConnections) + {} + + void run() { + threads->postEvent(new TestReadEvent(getAcceptedDesscriptor())); + results.countEvent(); + repost(this); + } +}; + +class EventChannelThreadsTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(EventChannelThreadsTest); + CPPUNIT_TEST(testThreads); + CPPUNIT_TEST_SUITE_END(); + + public: + + void setUp() { + threads = EventChannelThreads::create(EventChannel::create()); + } + + void tearDown() { + threads.reset(); + } + + void testThreads() + { + Socket listener = Socket::createTcp(); + int port = listener.listen(); + + // Post looping accept events, will repost nConnections times. + // The accept event will automatically post read events. + threads->postEvent(new TestAcceptEvent(listener.fd())); + + // Make connections. + Socket connections[nConnections]; + for (int i = 0; i < nConnections; ++i) { + connections[i] = Socket::createTcp(); + connections[i].connect("localhost", port); + } + + // Post looping write events. + for (int i = 0; i < nConnections; ++i) { + threads->postEvent(new TestWriteEvent(connections[i].fd())); + } + + // Wait for all events to be dispatched. + results.wait(); + + if (!results.exception.empty()) CPPUNIT_FAIL(results.exception); + CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining)); + + // Expect a read and write for each messageNo from each connection. + for (int messageNo = 0; messageNo < nMessages; ++messageNo) { + CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo])); + CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo])); + } + + threads->shutdown(); + threads->join(); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest); + |