summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-20 17:54:05 +0000
committerAlan Conway <aconway@apache.org>2006-12-20 17:54:05 +0000
commit013f077cade5451798b76c2912b12ec873b6177e (patch)
tree1bcf4f12bde03057695943d2094f9d4312094435
parent7c208cd735560336b3e87d24c7ba966288256067 (diff)
downloadqpid-python-013f077cade5451798b76c2912b12ec873b6177e.tar.gz
Current state of event channel code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/event-queue-2006-12-20@489159 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/configure.ac17
-rw-r--r--cpp/lib/broker/AutoDelete.cpp2
-rw-r--r--cpp/lib/broker/Broker.cpp2
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp161
-rw-r--r--cpp/lib/broker/BrokerChannel.h4
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp3
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp3
-rw-r--r--cpp/lib/broker/TopicExchange.cpp5
-rw-r--r--cpp/lib/client/ResponseHandler.cpp19
-rw-r--r--cpp/lib/common/Exception.cpp21
-rw-r--r--cpp/lib/common/Exception.h64
-rw-r--r--cpp/lib/common/ExceptionHolder.cpp32
-rw-r--r--cpp/lib/common/ExceptionHolder.h62
-rw-r--r--cpp/lib/common/Makefile.am32
-rw-r--r--cpp/lib/common/QpidError.cpp34
-rw-r--r--cpp/lib/common/QpidError.h36
-rw-r--r--cpp/lib/common/framing/AMQFrame.h4
-rw-r--r--cpp/lib/common/sys/Acceptor.h9
-rw-r--r--cpp/lib/common/sys/AtomicCount.h58
-rw-r--r--cpp/lib/common/sys/Runnable.cpp4
-rw-r--r--cpp/lib/common/sys/Runnable.h11
-rw-r--r--cpp/lib/common/sys/Socket.h5
-rw-r--r--cpp/lib/common/sys/Thread.h3
-rw-r--r--cpp/lib/common/sys/Time.h6
-rw-r--r--cpp/lib/common/sys/apr/APRAcceptor.cpp36
-rw-r--r--cpp/lib/common/sys/apr/LFProcessor.cpp2
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.cpp572
-rw-r--r--cpp/lib/common/sys/posix/EventChannel.h201
-rw-r--r--cpp/lib/common/sys/posix/EventChannelAcceptor.cpp4
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.cpp74
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.h43
-rw-r--r--cpp/lib/common/sys/posix/PosixAcceptor.cpp48
-rw-r--r--cpp/lib/common/sys/posix/Socket.cpp11
-rw-r--r--cpp/lib/common/sys/posix/check.cpp4
-rw-r--r--cpp/lib/common/sys/posix/check.h22
-rw-r--r--cpp/tests/EventChannelTest.cpp109
-rw-r--r--cpp/tests/EventChannelThreadsTest.cpp56
-rw-r--r--cpp/tests/Makefile.am22
-rwxr-xr-xcpp/tests/run-python-tests15
-rw-r--r--cpp/tests/topic_publisher.cpp14
-rwxr-xr-xcpp/tests/topictest57
41 files changed, 1108 insertions, 779 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac
index 0334b00fe9..b0947bd3b9 100644
--- a/cpp/configure.ac
+++ b/cpp/configure.ac
@@ -25,14 +25,6 @@ AC_USE_SYSTEM_EXTENSIONS
AM_MISSING_PROG([HELP2MAN], [help2man])
-AC_ARG_ENABLE(warnings,
-[ --enable-warnings turn on lots of compiler warnings (recommended)],
-[case "${enableval}" in
- yes|no) ;;
- *) AC_MSG_ERROR([bad value ${enableval} for warnings option]) ;;
- esac],
- [enableval=yes])
-
# Turn on this automake conditional if we are in a qpid
# hierarchy (i.e. with gentools/ and specs/ sibling directories),
# and if we have working java + javac.
@@ -53,7 +45,12 @@ AM_CONDITIONAL([CAN_GENERATE_CODE], [test x$build = xyes])
# -Wunreachable-code -Wpadded -Winline
# -Wshadow - warns about boost headers.
-if test "${enableval}" = yes; then
+AC_ARG_ENABLE(warnings,
+ [AS_HELP_STRING([--disable-warnings],
+ [Disable compiler warnings (default enabled)])],
+ [], [enable_warnings=yes])
+
+if test "${enable_warnings}" = yes; then
gl_COMPILER_FLAGS(-Werror)
gl_COMPILER_FLAGS(-pedantic)
gl_COMPILER_FLAGS(-Wall)
@@ -106,7 +103,7 @@ AC_ARG_ENABLE(apr,
yes|no) ;;
*) AC_MSG_ERROR([invalid APR enable/disable value: $enable_APR]) ;;
esac],
-[enable_APR=yes])
+[enable_APR=no])
APR_MINIMUM_VERSION=1.2.2
AC_SUBST(APR_MINIMUM_VERSION)
diff --git a/cpp/lib/broker/AutoDelete.cpp b/cpp/lib/broker/AutoDelete.cpp
index ae48d10505..6d87c982b4 100644
--- a/cpp/lib/broker/AutoDelete.cpp
+++ b/cpp/lib/broker/AutoDelete.cpp
@@ -63,7 +63,7 @@ void AutoDelete::run(){
Monitor::ScopedLock l(monitor);
while(!stopped){
process();
- monitor.wait(period*TIME_MSEC);
+ monitor.wait(now() + period*TIME_MSEC);
}
}
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 6c0d7a3f3f..e2265a292f 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -47,7 +47,7 @@ Broker::shared_ptr Broker::create(const Configuration& config) {
}
void Broker::run() {
- acceptor->run(&factory);
+ acceptor->run(factory);
}
void Broker::shutdown() {
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 5d4f68a8af..979617b594 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -18,11 +18,15 @@
* under the License.
*
*/
-#include <BrokerChannel.h>
-#include <QpidError.h>
+#include <assert.h>
+
#include <iostream>
#include <sstream>
-#include <assert.h>
+
+#include <boost/bind.hpp>
+
+#include "BrokerChannel.h"
+#include "QpidError.h"
using std::mem_fun_ref;
using std::bind2nd;
@@ -50,11 +54,18 @@ Channel::~Channel(){
}
bool Channel::exists(const string& consumerTag){
+ Mutex::ScopedLock l(lock);
return consumers.find(consumerTag) != consumers.end();
}
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*){
- if(tag.empty()) tag = tagGenerator.generate();
+void Channel::consume(
+ string& tag, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection, const FieldTable*)
+{
+ Mutex::ScopedLock l(lock);
+ if(tag.empty()) tag = tagGenerator.generate();
+ // TODO aconway 2006-12-13: enforce ownership of consumer
+ // with auto_ptr.
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
@@ -65,7 +76,8 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl
}
}
-void Channel::cancel(consumer_iterator i){
+void Channel::cancel(consumer_iterator i) {
+ // Private, must be called with lock held.
ConsumerImpl* c = i->second;
consumers.erase(i);
if(c){
@@ -75,6 +87,7 @@ void Channel::cancel(consumer_iterator i){
}
void Channel::cancel(const string& tag){
+ Mutex::ScopedLock l(lock);
consumer_iterator i = consumers.find(tag);
if(i != consumers.end()){
cancel(i);
@@ -82,11 +95,14 @@ void Channel::cancel(const string& tag){
}
void Channel::close(){
- //cancel all consumers
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- cancel(i);
+ {
+ Mutex::ScopedLock l(lock);
+ while(!consumers.empty()) {
+ cancel(consumers.begin());
+ }
}
- //requeue:
+ // TODO aconway 2006-12-13: does recovery need to be atomic with
+ // cancelling all consumers?
recover(true);
}
@@ -109,20 +125,21 @@ void Channel::rollback(){
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
- Mutex::ScopedLock locker(deliveryLock);
-
- u_int64_t deliveryTag = currentDeliveryTag++;
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
- outstanding.size += msg->contentSize();
- outstanding.count++;
+ u_int64_t deliveryTag;
+ {
+ Mutex::ScopedLock l(lock);
+ deliveryTag = currentDeliveryTag++;
+ if(ackExpected){
+ unacked.push_back(
+ DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
+ }
}
- //send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
- Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
@@ -187,71 +204,99 @@ void Channel::complete(Message::shared_ptr& msg){
}
exchange.reset();
}else{
- std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+ std::cout << "Exchange not known in" << BOOST_CURRENT_FUNCTION
+ << std::endl;
}
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple){
+void Channel::ack(u_int64_t deliveryTag, bool multiple) {
if(transactional){
+ Mutex::ScopedLock locker(lock);
accumulatedAck.update(deliveryTag, multiple);
- //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: I think the outstanding prefetch size & count should
+ //be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
- }else{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
- if(i == unacked.end()){
- throw InvalidAckException();
- }else if(multiple){
- ack_iterator end = ++i;
- for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
- unacked.erase(unacked.begin(), end);
-
- //recalculate the prefetch:
- outstanding.reset();
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
- }else{
- i->discard();
- i->subtractFrom(&outstanding);
- unacked.erase(i);
- }
+ }
+ else {
+ {
+ Mutex::ScopedLock locker(lock);
+ ack_iterator i = find_if(
+ unacked.begin(), unacked.end(),
+ boost::bind(&DeliveryRecord::matches, _1, deliveryTag));
+ if(i == unacked.end()) {
+ throw InvalidAckException();
+ }
+ else if(multiple) {
+ ack_iterator end = ++i;
+ for_each(unacked.begin(), end,
+ mem_fun_ref(&DeliveryRecord::discard));
+ unacked.erase(unacked.begin(), end);
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(
+ unacked.begin(), unacked.end(),
+ boost::bind(&DeliveryRecord::addTo, _1, &outstanding));
+ }
+ else {
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
+ }
+ }
//if the prefetch limit had previously been reached, there may
//be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+
+ // TODO aconway 2006-12-13: Does this need to be atomic?
+ // If so we need a redesign, requestDispatch re-enters
+ // Channel::dispatch.
+ //
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
j->second->requestDispatch();
}
}
}
-void Channel::recover(bool requeue){
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstanding.reset();
- std::list<DeliveryRecord> copy = unacked;
- unacked.clear();
- for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+void Channel::recover(bool requeue) {
+ std::list<DeliveryRecord> copyUnacked;
+ boost::function1<void, DeliveryRecord&> recoverFn;
+ {
+ Mutex::ScopedLock l(lock);
+ if(requeue) {
+ outstanding.reset();
+ copyUnacked.swap(unacked);
+ recoverFn = boost::bind(&DeliveryRecord::requeue, _1);
+ }
+ else {
+ copyUnacked = unacked;
+ recoverFn = boost::bind(&DeliveryRecord::redeliver, _1, this);
+ }
}
+ // TODO aconway 2006-12-13: Does recovery of copyUnacked have to
+ // be atomic with extracting the list?
+ for_each(copyUnacked.begin(), copyUnacked.end(), recoverFn);
}
bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Mutex::ScopedLock l(lock);
+ // TODO aconway 2006-12-13: Nasty to have all these external calls
+ // inside a critical.section but none appear to have blocking potential.
+ // sendGetOk does non-blocking IO
+ //
Message::shared_ptr msg = queue->dequeue();
- if(msg){
- Mutex::ScopedLock locker(deliveryLock);
+ if(msg) {
u_int64_t myDeliveryTag = currentDeliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ u_int32_t count = queue->getMessageCount();
+ msg->sendGetOk(out, id, count + 1, myDeliveryTag, framesize);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
return true;
- }else{
- return false;
}
+ return false;
}
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ u_int64_t deliveryTag){
msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index fa3912c78e..776862dd34 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -79,10 +79,10 @@ namespace qpid {
u_int32_t prefetchSize;
u_int16_t prefetchCount;
Prefetch outstanding;
- u_int32_t framesize;
+ const u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::sys::Mutex deliveryLock;
+ qpid::sys::Mutex lock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
MessageStore* const store;
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 598de2d590..148dbefa78 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -143,7 +143,8 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
u_int64_t expected = expectedContentSize();
if (expected != buffer.available()) {
- std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+ std::cout << "WARN: Expected " << expectedContentSize()
+ << " bytes, got " << buffer.available() << std::endl;
throw Exception("Cannot decode content, buffer not large enough.");
}
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 0e48d3b13d..deb7c38824 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -83,7 +83,8 @@ bool Queue::dispatch(Message::shared_ptr& msg){
return false;
}else if(exclusive){
if(!exclusive->deliver(msg)){
- std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+ std::cout << "WARNING: Dropping undeliverable message "
+ << "from queue with exclusive consumer." << std::endl;
}
return true;
}else{
diff --git a/cpp/lib/broker/TopicExchange.cpp b/cpp/lib/broker/TopicExchange.cpp
index 3ebb3c8c56..73a633859b 100644
--- a/cpp/lib/broker/TopicExchange.cpp
+++ b/cpp/lib/broker/TopicExchange.cpp
@@ -81,10 +81,7 @@ void TopicPattern::normalize() {
namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
-// Need StringRef class that operates on a string in place witout copy.
-// Should be applied everywhere strings are extracted from frames.
-//
+
bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
{
// Invariant: [pattern_begin..p) matches [target_begin..t)
diff --git a/cpp/lib/client/ResponseHandler.cpp b/cpp/lib/client/ResponseHandler.cpp
index ac8b4a9ced..244831effe 100644
--- a/cpp/lib/client/ResponseHandler.cpp
+++ b/cpp/lib/client/ResponseHandler.cpp
@@ -28,32 +28,33 @@ qpid::client::ResponseHandler::ResponseHandler() : waiting(false){}
qpid::client::ResponseHandler::~ResponseHandler(){}
-bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
+bool qpid::client::ResponseHandler::validate(
+ const qpid::framing::AMQMethodBody& expected)
+{
return expected.match(response.get());
}
void qpid::client::ResponseHandler::waitForResponse(){
Monitor::ScopedLock l(monitor);
- if(waiting){
+ while(waiting)
monitor.wait();
- }
}
-void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
- response = _response;
+void qpid::client::ResponseHandler::signalResponse(
+ qpid::framing::AMQMethodBody::shared_ptr _response)
+{
Monitor::ScopedLock l(monitor);
+ response = _response;
waiting = false;
monitor.notify();
}
void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
Monitor::ScopedLock l(monitor);
- if(waiting){
+ while(waiting)
monitor.wait();
- }
- if(!validate(expected)){
+ if(!validate(expected))
THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
- }
}
void qpid::client::ResponseHandler::expect(){
diff --git a/cpp/lib/common/Exception.cpp b/cpp/lib/common/Exception.cpp
index 0161518011..ef88c5cb74 100644
--- a/cpp/lib/common/Exception.cpp
+++ b/cpp/lib/common/Exception.cpp
@@ -20,6 +20,7 @@
*/
#include <Exception.h>
+#include <iostream>
namespace qpid {
@@ -29,14 +30,32 @@ Exception::Exception(const std::string& str) throw() : whatStr(str) {}
Exception::Exception(const char* str) throw() : whatStr(str) {}
+Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
+
Exception::~Exception() throw() {}
const char* Exception::what() const throw() { return whatStr.c_str(); }
std::string Exception::toString() const throw() { return whatStr; }
-Exception* Exception::clone() const throw() { return new Exception(*this); }
+Exception::auto_ptr Exception::clone() const throw() {
+ return Exception::auto_ptr(new Exception(*this));
+}
void Exception::throwSelf() const { throw *this; }
+const char* Exception::defaultMessage = "Unexpected exception";
+
+void Exception::log(const char* what, const char* message) {
+ std::cout << message << ": " << what << std::endl;
+}
+
+void Exception::log(const std::exception& e, const char* message) {
+ log(e.what(), message);
+}
+
+void Exception::logUnknown(const char* message) {
+ log("unknown exception.", message);
+}
+
} // namespace qpid
diff --git a/cpp/lib/common/Exception.h b/cpp/lib/common/Exception.h
index f35d427bb0..185c395283 100644
--- a/cpp/lib/common/Exception.h
+++ b/cpp/lib/common/Exception.h
@@ -26,6 +26,7 @@
#include <string>
#include <memory>
#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
namespace qpid
{
@@ -38,6 +39,10 @@ class Exception : public std::exception
std::string whatStr;
public:
+ typedef boost::shared_ptr<Exception> shared_ptr;
+ typedef boost::shared_ptr<const Exception> shared_ptr_const;
+ typedef std::auto_ptr<Exception> auto_ptr;
+
Exception() throw();
Exception(const std::string& str) throw();
Exception(const char* str) throw();
@@ -48,14 +53,65 @@ class Exception : public std::exception
virtual const char* what() const throw();
virtual std::string toString() const throw();
- virtual Exception* clone() const throw();
+ virtual std::auto_ptr<Exception> clone() const throw();
virtual void throwSelf() const;
- typedef boost::shared_ptr<Exception> shared_ptr;
-};
+ /** Default message: "Unknown exception" or something like it. */
+ static const char* defaultMessage;
+ /**
+ * Log a message of the form "message: what"
+ *@param what Exception's what() message.
+ *@param message Prefix message.
+ */
+ static void log(const char* what, const char* message = defaultMessage);
-}
+ /**
+ * Log an exception.
+ *@param e Exception to log.
+
+ */
+ static void log(
+ const std::exception& e, const char* message = defaultMessage);
+
+
+ /**
+ * Log an unknown exception - use in catch(...)
+ *@param message Prefix message.
+ */
+ static void logUnknown(const char* message = defaultMessage);
+
+ /**
+ * Wrapper template function to call another function inside
+ * try/catch and log any exception. Use boost::bind to wrap
+ * member function calls or functions with arguments.
+ *
+ *@param f Function to call in try block.
+ *@param retrhow If true the exception is rethrown.
+ *@param message Prefix message.
+ */
+ template <class T>
+ static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
+ const char* message=defaultMessage)
+ {
+ try {
+ return f();
+ }
+ catch (const std::exception& e) {
+ log(e, message);
+ if (rethrow)
+ throw;
+ }
+ catch (...) {
+ logUnknown(message);
+ if (rethrow)
+ throw;
+ }
+ }
+
+};
+
+} // namespace qpid
#endif /*!_Exception_*/
diff --git a/cpp/lib/common/ExceptionHolder.cpp b/cpp/lib/common/ExceptionHolder.cpp
index de8d7b2487..e69de29bb2 100644
--- a/cpp/lib/common/ExceptionHolder.cpp
+++ b/cpp/lib/common/ExceptionHolder.cpp
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "ExceptionHolder.h"
-
-namespace qpid {
-
-ExceptionHolder::ExceptionHolder(const std::exception& e) {
- const Exception* ex = dynamic_cast<const Exception*>(&e);
- if (ex) {
- reset(ex->clone());
- } else {
- reset(new Exception(e.what()));
- }
-}
-
-}
diff --git a/cpp/lib/common/ExceptionHolder.h b/cpp/lib/common/ExceptionHolder.h
index 83d0884be9..e69de29bb2 100644
--- a/cpp/lib/common/ExceptionHolder.h
+++ b/cpp/lib/common/ExceptionHolder.h
@@ -1,62 +0,0 @@
-#ifndef _qpid_ExceptionHolder_h
-#define _qpid_ExceptionHolder_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <Exception.h>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-
-/**
- * Holder for a heap-allocated exc eption that can be stack allocated
- * and thrown safely.
- *
- * Basically this is a shared_ptr with the Exception functions added
- * so the catcher need not be aware that it is a pointer rather than a
- * reference.
- *
- * shared_ptr is chosen over auto_ptr because it has normal
- * copy semantics.
- */
-class ExceptionHolder : public Exception, public boost::shared_ptr<Exception>
-{
- public:
- typedef boost::shared_ptr<Exception> shared_ptr;
-
- ExceptionHolder() throw() {}
- ExceptionHolder(Exception* p) throw() : shared_ptr(p) {}
- ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {}
-
- ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {}
- ExceptionHolder(const std::exception& e);
-
- ~ExceptionHolder() throw() {}
-
- const char* what() const throw() { return (*this)->what(); }
- std::string toString() const throw() { return (*this)->toString(); }
- virtual Exception* clone() const throw() { return (*this)->clone(); }
- virtual void throwSelf() const { (*this)->throwSelf(); }
-};
-
-} // namespace qpid
-
-
-
-#endif /*!_qpid_ExceptionHolder_h*/
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index e1f7503282..997558f3da 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -24,21 +24,23 @@ apr_hdr = \
$(apr)/LFSessionContext.h
posix = sys/posix
-posix_src = \
- $(posix)/PosixAcceptor.cpp \
- $(posix)/Socket.cpp \
- $(posix)/Thread.cpp \
- $(posix)/check.cpp \
- $(posix)/EventChannel.cpp \
- $(posix)/EventChannelThreads.cpp
-posix_hdr = \
- $(posix)/check.h \
- $(posix)/EventChannel.h \
- $(posix)/EventChannelThreads.h
+posix_src = \
+ $(posix)/EventChannelAcceptor.cpp \
+ $(posix)/Socket.cpp \
+ $(posix)/Thread.cpp \
+ $(posix)/check.cpp \
+ $(posix)/EventChannel.cpp \
+ $(posix)/EventChannelThreads.cpp \
+ $(posix)/EventChannelConnection.cpp
+posix_hdr = \
+ $(posix)/check.h \
+ $(posix)/EventChannel.h \
+ $(posix)/EventChannelThreads.h \
+ $(posix)/EventChannelConnection.h
-EXTRA_DIST=$(posix_src) $(posix_hdr)
-platform_src = $(apr_src)
-platform_hdr = $(apr_hdr)
+EXTRA_DIST=$(apr_src) $(apr_hdr)
+platform_src = $(posix_src)
+platform_hdr = $(posix_hdr)
framing = framing
gen = $(srcdir)/../../gen
@@ -76,7 +78,6 @@ libqpidcommon_la_SOURCES = \
$(gen)/AMQP_MethodVersionMap.cpp \
$(gen)/AMQP_ServerProxy.cpp \
Exception.cpp \
- ExceptionHolder.cpp \
QpidError.cpp \
sys/Runnable.cpp \
sys/Time.cpp
@@ -107,7 +108,6 @@ nobase_pkginclude_HEADERS = \
$(framing)/amqp_types.h \
$(framing)/AMQP_HighestVersion.h \
Exception.h \
- ExceptionHolder.h \
QpidError.h \
SharedObject.h \
sys/Acceptor.h \
diff --git a/cpp/lib/common/QpidError.cpp b/cpp/lib/common/QpidError.cpp
index 7f4f9e2f34..9cbd66c841 100644
--- a/cpp/lib/common/QpidError.cpp
+++ b/cpp/lib/common/QpidError.cpp
@@ -22,23 +22,41 @@
#include <QpidError.h>
#include <sstream>
-using namespace qpid;
+namespace qpid {
QpidError::QpidError() : code(0) {}
-QpidError::QpidError(int _code, const std::string& _msg,
- const SrcLine& _loc) throw()
+QpidError::QpidError(
+ int _code, const std::string& _msg, Location _loc) throw()
: code(_code), msg(_msg), location(_loc)
{
- std::ostringstream os;
- os << "Error [" << code << "] " << msg << " ("
- << location.file << ":" << location.line << ")";
- whatStr = os.str();
+ setWhat();
+}
+
+QpidError::QpidError(
+ int _code, const char* _msg, Location _loc) throw()
+ : code(_code), msg(_msg), location(_loc)
+{
+ setWhat();
}
QpidError::~QpidError() throw() {}
-Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+Exception::auto_ptr QpidError::clone() const throw() {
+ return Exception::auto_ptr(new QpidError(*this));
+}
void QpidError::throwSelf() const { throw *this; }
+void QpidError::setWhat() {
+ std::ostringstream os;
+ os << "Error [" << code << "] " << msg;
+ if (location.file) {
+ os << " (" ;
+ os << location.file << ":" << location.line;
+ os << ")";
+ }
+ whatStr = os.str();
+}
+
+} // namespace qpid
diff --git a/cpp/lib/common/QpidError.h b/cpp/lib/common/QpidError.h
index 30d9d27076..9a47aa5e00 100644
--- a/cpp/lib/common/QpidError.h
+++ b/cpp/lib/common/QpidError.h
@@ -24,37 +24,45 @@
#include <memory>
#include <ostream>
#include <Exception.h>
+#include <boost/current_function.hpp>
namespace qpid {
-struct SrcLine {
- public:
- SrcLine(const std::string& file_="", int line_=0) :
- file(file_), line(line_) {}
-
- std::string file;
- int line;
-};
-
class QpidError : public Exception {
public:
+ // Use macro QPID_LOCATION to construct a location.
+ struct Location {
+ Location(const char* function_=0, const char* file_=0, int line_=0) :
+ function(function_), file(file_), line(line_) {}
+ const char* function;
+ const char* file;
+ int line;
+ };
+
const int code;
const std::string msg;
- const SrcLine location;
+ const Location location;
QpidError();
- QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw();
+ QpidError(int _code, const char* _msg, const Location _loc) throw();
+ QpidError(int _code, const std::string& _msg, const Location _loc) throw();
+
~QpidError() throw();
- Exception* clone() const throw();
+ Exception::auto_ptr clone() const throw();
void throwSelf() const;
+
+ private:
+ void setWhat();
};
} // namespace qpid
-#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
+#define QPID_ERROR_LOCATION \
+ ::qpid::QpidError::Location(BOOST_CURRENT_FUNCTION, __FILE__, __LINE__)
-#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
+#define QPID_ERROR(CODE, MESSAGE) \
+ ::qpid::QpidError((CODE), (MESSAGE), QPID_ERROR_LOCATION)
#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
diff --git a/cpp/lib/common/framing/AMQFrame.h b/cpp/lib/common/framing/AMQFrame.h
index bec1946fb7..1ff3ff191f 100644
--- a/cpp/lib/common/framing/AMQFrame.h
+++ b/cpp/lib/common/framing/AMQFrame.h
@@ -40,9 +40,9 @@ namespace qpid {
static AMQP_MethodVersionMap versionMap;
u_int16_t channel;
- u_int8_t type;//used if the body is decoded separately from the 'head'
+ u_int8_t type;//used if body decoded separately from 'head'
AMQBody::shared_ptr body;
- AMQBody::shared_ptr createMethodBody(Buffer& buffer);
+ AMQBody::shared_ptr createMethodBody(Buffer& buffer);
public:
AMQFrame();
diff --git a/cpp/lib/common/sys/Acceptor.h b/cpp/lib/common/sys/Acceptor.h
index e6bc27a593..7aed068dd0 100644
--- a/cpp/lib/common/sys/Acceptor.h
+++ b/cpp/lib/common/sys/Acceptor.h
@@ -33,10 +33,11 @@ class SessionHandlerFactory;
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
- static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
- virtual ~Acceptor() = 0;
- virtual int16_t getPort() const = 0;
- virtual void run(qpid::sys::SessionHandlerFactory* factory) = 0;
+ static Acceptor::shared_ptr create(
+ int16_t port, int backlog, int threads, bool trace = false);
+ virtual ~Acceptor();
+ virtual int getPort() const = 0;
+ virtual void run(SessionHandlerFactory& factory) = 0;
virtual void shutdown() = 0;
};
diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h
index b625b2c9b0..7a9555480f 100644
--- a/cpp/lib/common/sys/AtomicCount.h
+++ b/cpp/lib/common/sys/AtomicCount.h
@@ -21,36 +21,52 @@
#include <boost/detail/atomic_count.hpp>
#include <boost/noncopyable.hpp>
+#include <boost/function.hpp>
namespace qpid {
namespace sys {
/**
- * Atomic counter.
+ * Increment counter in constructor and decrement in destructor.
+ * Optionally call a function if the decremented counter value is 0.
+ * Note the function must not throw, it is called in the destructor.
*/
-class AtomicCount : boost::noncopyable {
+template <class Count>
+class ScopedIncrement : boost::noncopyable {
public:
- class ScopedDecrement : boost::noncopyable {
- public:
- /** Decrement counter in constructor and increment in destructor. */
- ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
- ~ScopedDecrement() { ++count; }
- /** Return the value returned by the decrement. */
- operator long() { return value; }
- private:
- AtomicCount& count;
- long value;
- };
+ ScopedIncrement(Count& c, boost::function0<void> f=0)
+ : count(c), callback(f) { ++count; }
+ ~ScopedIncrement() { if (--count == 0 && callback) callback(); }
- class ScopedIncrement : boost::noncopyable {
- public:
- /** Increment counter in constructor and increment in destructor. */
- ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
- ~ScopedIncrement() { --count; }
- private:
- AtomicCount& count;
- };
+ private:
+ Count& count;
+ boost::function0<void> callback;
+};
+/** Decrement counter in constructor and increment in destructor. */
+template <class Count>
+class ScopedDecrement : boost::noncopyable {
+ public:
+ ScopedDecrement(Count& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+
+ /** Return the value after the decrement. */
+ operator long() { return value; }
+
+ private:
+ Count& count;
+ long value;
+};
+
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount : boost::noncopyable {
+ public:
+ typedef ScopedIncrement<AtomicCount> ScopedIncrement;
+ typedef ScopedDecrement<AtomicCount> ScopedDecrement;
+
AtomicCount(long value = 0) : count(value) {}
void operator++() { ++count ; }
diff --git a/cpp/lib/common/sys/Runnable.cpp b/cpp/lib/common/sys/Runnable.cpp
index 30122c682f..5d4f48a373 100644
--- a/cpp/lib/common/sys/Runnable.cpp
+++ b/cpp/lib/common/sys/Runnable.cpp
@@ -29,4 +29,8 @@ Runnable::Functor Runnable::functor()
return boost::bind(&Runnable::run, this);
}
+void FunctorRunnable::run() {
+ f();
+}
+
}}
diff --git a/cpp/lib/common/sys/Runnable.h b/cpp/lib/common/sys/Runnable.h
index fb3927c612..ef18897b09 100644
--- a/cpp/lib/common/sys/Runnable.h
+++ b/cpp/lib/common/sys/Runnable.h
@@ -44,7 +44,16 @@ class Runnable
Functor functor();
};
-}}
+/** Runnable wrapper for a functor */
+class FunctorRunnable : public Runnable {
+ public:
+ explicit FunctorRunnable(const Runnable::Functor& runMe) : f(runMe) {}
+ void run();
+ private:
+ Runnable::Functor f;
+};
+
+}} // namespace qpid::sys
#endif
diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h
index d793a240c6..e35ed5b07c 100644
--- a/cpp/lib/common/sys/Socket.h
+++ b/cpp/lib/common/sys/Socket.h
@@ -70,8 +70,11 @@ class Socket
*/
int listen(int port = 0, int backlog = 10);
+ /** Accept a connection. This socket must be listening */
+ Socket accept();
+
/** Get file descriptor */
- int fd();
+ int fd() const;
private:
#ifdef USE_APR
diff --git a/cpp/lib/common/sys/Thread.h b/cpp/lib/common/sys/Thread.h
index 47b95b6234..9647dc2414 100644
--- a/cpp/lib/common/sys/Thread.h
+++ b/cpp/lib/common/sys/Thread.h
@@ -116,7 +116,8 @@ Thread::Thread(Runnable& runnable) {
}
void Thread::join(){
- QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+ if (thread != 0)
+ QPID_POSIX_THROW_IF(pthread_join(thread, 0));
}
long Thread::id() {
diff --git a/cpp/lib/common/sys/Time.h b/cpp/lib/common/sys/Time.h
index 3dd46741d8..4c6951b429 100644
--- a/cpp/lib/common/sys/Time.h
+++ b/cpp/lib/common/sys/Time.h
@@ -22,6 +22,7 @@
*
*/
+#include <limits>
#include <stdint.h>
#ifdef USE_APR
@@ -33,7 +34,7 @@
namespace qpid {
namespace sys {
-/** Time in nanoseconds */
+/** Time in nanoseconds. */
typedef int64_t Time;
Time now();
@@ -47,6 +48,9 @@ const Time TIME_USEC = 1000;
/** Nanoseconds per nanosecond. */
const Time TIME_NSEC = 1;
+/** Value to represent an infinite timeout */
+const Time TIME_INFINITE = std::numeric_limits<Time>::max();
+
#ifndef USE_APR
struct timespec toTimespec(const Time& t);
struct timespec& toTimespec(struct timespec& ts, const Time& t);
diff --git a/cpp/lib/common/sys/apr/APRAcceptor.cpp b/cpp/lib/common/sys/apr/APRAcceptor.cpp
index 6853833797..1bd23819f4 100644
--- a/cpp/lib/common/sys/apr/APRAcceptor.cpp
+++ b/cpp/lib/common/sys/apr/APRAcceptor.cpp
@@ -32,20 +32,16 @@ class APRAcceptor : public Acceptor
{
public:
APRAcceptor(int16_t port, int backlog, int threads, bool trace);
- virtual int16_t getPort() const;
- virtual void run(qpid::sys::SessionHandlerFactory* factory);
+ virtual int getPort() const;
+ virtual void run(qpid::sys::SessionHandlerFactory& factory);
virtual void shutdown();
private:
- void shutdownImpl();
-
- private:
int16_t port;
bool trace;
LFProcessor processor;
apr_socket_t* socket;
volatile bool running;
- Mutex shutdownLock;
};
// Define generic Acceptor::create() to return APRAcceptor.
@@ -69,13 +65,13 @@ Acceptor::~Acceptor() {}
CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
}
-int16_t APRAcceptor::getPort() const {
+int APRAcceptor::getPort() const {
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
return address->port;
}
-void APRAcceptor::run(SessionHandlerFactory* factory) {
+void APRAcceptor::run(SessionHandlerFactory& factory) {
running = true;
processor.start();
std::cout << "Listening on port " << getPort() << "..." << std::endl;
@@ -90,32 +86,24 @@ void APRAcceptor::run(SessionHandlerFactory* factory) {
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
- session->init(factory->create(session));
+ session->init(factory.create(session));
}else{
- Mutex::ScopedLock locker(shutdownLock);
- if(running) {
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- shutdownImpl();
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
}
}
}
+ shutdown();
}
void APRAcceptor::shutdown() {
- Mutex::ScopedLock locker(shutdownLock);
if (running) {
- shutdownImpl();
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
}
}
-void APRAcceptor::shutdownImpl() {
- Mutex::ScopedLock locker(shutdownLock);
- running = false;
- processor.stop();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
-}
-
}}
diff --git a/cpp/lib/common/sys/apr/LFProcessor.cpp b/cpp/lib/common/sys/apr/LFProcessor.cpp
index 2b6fc92623..f5d59e31d7 100644
--- a/cpp/lib/common/sys/apr/LFProcessor.cpp
+++ b/cpp/lib/common/sys/apr/LFProcessor.cpp
@@ -27,8 +27,6 @@
using namespace qpid::sys;
using qpid::QpidError;
-// TODO aconway 2006-10-12: stopped is read outside locks.
-//
LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
size(_size),
diff --git a/cpp/lib/common/sys/posix/EventChannel.cpp b/cpp/lib/common/sys/posix/EventChannel.cpp
index 16c7ec9c3f..860ecd6b07 100644
--- a/cpp/lib/common/sys/posix/EventChannel.cpp
+++ b/cpp/lib/common/sys/posix/EventChannel.cpp
@@ -1,4 +1,4 @@
-/*
+/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
@@ -16,6 +16,13 @@
*
*/
+// TODO aconway 2006-12-15: Locking review.
+
+// TODO aconway 2006-12-15: use Descriptor pointers everywhere,
+// get them from channel, pass them to Event constructors.
+// Eliminate lookup.
+
+
#include <mqueue.h>
#include <string.h>
#include <iostream>
@@ -29,10 +36,10 @@
#include <queue>
#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/current_function.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/bind.hpp>
#include <QpidError.h>
-#include <sys/Monitor.h>
#include "check.h"
#include "EventChannel.h"
@@ -40,127 +47,319 @@
using namespace std;
-// Convenience template to zero out a struct.
-template <class S> struct ZeroStruct : public S {
- ZeroStruct() { memset(this, 0, sizeof(*this)); }
-};
-
namespace qpid {
namespace sys {
+// ================================================================
+// Private class declarations
+
+namespace {
+
+typedef enum { IN, OUT } Direction;
+typedef std::pair<Event*, Event*> EventPair;
+} // namespace
+
/**
- * EventHandler wraps an epoll file descriptor. Acts as private
- * interface between EventChannel and subclasses.
- *
- * Also implements Event interface for events that are not associated
- * with a file descriptor and are passed via the message queue.
- */
-class EventHandler : public Event, private Monitor
+ * Queue of events corresponding to one IO direction (IN or OUT).
+ * Each Descriptor contains two Queues.
+ */
+class EventChannel::Queue : private boost::noncopyable
{
public:
- EventHandler(int epollSize = 256);
- ~EventHandler();
+ Queue(Descriptor& container, Direction dir);
- int getEpollFd() { return epollFd; }
- void epollAdd(int fd, uint32_t epollEvents, Event* event);
- void epollMod(int fd, uint32_t epollEvents, Event* event);
- void epollDel(int fd);
+ /** Called by Event classes in prepare() */
+ void push(Event* e);
- void mqPut(Event* event);
- Event* mqGet();
-
- protected:
- // Should never be called, only complete.
- void prepare(EventHandler&) { assert(0); }
- Event* complete(EventHandler& eh);
+ /** Called when epoll wakes.
+ *@return The next completed event or 0.
+ */
+ Event* wake(uint32_t epollFlags);
+
+ void setBit(uint32_t &epollFlags);
+
+ void shutdown();
private:
+ typedef std::deque<Event*> EventQ;
+
+ inline bool isMyEvent(uint32_t flags) { return flags | myEvent; }
+
+ Mutex& lock; // Shared with Descriptor.
+ Descriptor& descriptor;
+ uint32_t myEvent; // Epoll event flag.
+ EventQ queue;
+};
+
+
+/**
+ * Manages a file descriptor in an epoll set.
+ *
+ * Can be shutdown and re-activated for the same file descriptor.
+ */
+class EventChannel::Descriptor : private boost::noncopyable {
+ public:
+ Descriptor() : epollFd(-1), myFd(-1),
+ inQueue(*this, IN), outQueue(*this, OUT) {}
+
+ void activate(int epollFd_, int myFd_);
+
+ /** Epoll woke up for this descriptor. */
+ EventPair wake(uint32_t epollEvents);
+
+ /** Shut down: close and remove file descriptor.
+ * May be re-activated if fd is reused.
+ */
+ void shutdown();
+
+ // TODO aconway 2006-12-18: Nasty. Need to clean up interaction.
+ void shutdownUnsafe();
+
+ bool isShutdown() { return epollFd == -1; }
+
+ Queue& getQueue(Direction d) { return d==IN ? inQueue : outQueue; }
+
+ private:
+ void update();
+ void epollCtl(int op, uint32_t events);
+
+ Mutex lock;
int epollFd;
- std::string mqName;
- int mqFd;
- std::queue<Event*> mqEvents;
+ int myFd;
+ Queue inQueue, outQueue;
+
+ friend class Queue;
};
-EventHandler::EventHandler(int epollSize)
-{
- epollFd = epoll_create(epollSize);
- if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+/**
+ * Holds the epoll fd, Descriptor map and dispatch queue.
+ * Most of the epoll work is done by the Descriptors.
+ */
+class EventChannel::Impl {
+ public:
+ Impl(int size = 256);
- // Create a POSIX message queue for non-fd events.
- // We write one byte and never read it is always ready for read
- // when we add it to epoll.
- //
- ZeroStruct<struct mq_attr> attr;
- attr.mq_maxmsg = 1;
- attr.mq_msgsize = 1;
- do {
- char tmpnam[L_tmpnam];
- tmpnam_r(tmpnam);
- mqName = tmpnam + 4; // Skip "tmp/"
- mqFd = mq_open(
- mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
- if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
- } while (mqFd == EEXIST); // Name already taken, try again.
+ ~Impl();
- static char zero = '\0';
- mq_send(mqFd, &zero, 1, 0);
- epollAdd(mqFd, 0, this);
+ /**
+ * Registers fd if not already registered.
+ */
+ Descriptor& getDescriptor(int fd);
+
+ /** Wait for an event, return 0 on timeout */
+ Event* wait(Time timeout);
+
+ Queue& getDispatchQueue() { return *dispatchQueue; }
+
+ private:
+
+ typedef boost::ptr_map<int, Descriptor> DescriptorMap;
+
+ Mutex lock;
+ int epollFd;
+ DescriptorMap descriptors;
+ int pipe[2];
+ Queue* dispatchQueue;
+};
+
+
+
+// ================================================================
+// EventChannel::Queue::implementation.
+
+static const char* shutdownMsg = "Event queue shut down.";
+
+EventChannel::Queue::Queue(Descriptor& d, Direction dir) : lock(d.lock), descriptor(d),
+ myEvent(dir==IN ? EPOLLIN : EPOLLOUT)
+{}
+
+void EventChannel::Queue::push(Event* e) {
+ Mutex::ScopedLock l(lock);
+ if (descriptor.isShutdown())
+ THROW_QPID_ERROR(INTERNAL_ERROR, shutdownMsg);
+ queue.push_back(e);
+ descriptor.update();
}
-EventHandler::~EventHandler() {
- mq_close(mqFd);
- mq_unlink(mqName.c_str());
+void EventChannel::Queue::setBit(uint32_t &epollFlags) {
+ if (queue.empty())
+ epollFlags &= ~myEvent;
+ else
+ epollFlags |= myEvent;
}
-void EventHandler::mqPut(Event* event) {
- ScopedLock l(*this);
- assert(event != 0);
- mqEvents.push(event);
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+Event* EventChannel::Queue::wake(uint32_t epollFlags) {
+ // Called with lock held.
+ if (!queue.empty() && (isMyEvent(epollFlags))) {
+ Event* e = queue.front()->complete(descriptor);
+ if (e) {
+ queue.pop_front();
+ return e;
+ }
+ }
+ return 0;
+}
+
+void EventChannel::Queue::shutdown() {
+ // Mark all pending events with a shutdown exception.
+ // The server threads will remove and dispatch the events.
+ //
+ qpid::QpidError ex(INTERNAL_ERROR, shutdownMsg, QPID_ERROR_LOCATION);
+ for_each(queue.begin(), queue.end(),
+ boost::bind(&Event::setException, _1, ex));
}
-Event* EventHandler::mqGet() {
- ScopedLock l(*this);
- if (mqEvents.empty())
- return 0;
- Event* event = mqEvents.front();
- mqEvents.pop();
- if(!mqEvents.empty())
- epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
- return event;
+
+// ================================================================
+// Descriptor
+
+
+void EventChannel::Descriptor::activate(int epollFd_, int myFd_) {
+ Mutex::ScopedLock l(lock);
+ assert(myFd < 0 || (myFd == myFd_)); // Can't change fd.
+ if (epollFd < 0) { // Means we're not polling.
+ epollFd = epollFd_;
+ myFd = myFd_;
+ epollCtl(EPOLL_CTL_ADD, 0);
+ }
}
-void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
-{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+void EventChannel::Descriptor::shutdown() {
+ Mutex::ScopedLock l(lock);
+ shutdownUnsafe();
+}
+
+void EventChannel::Descriptor::shutdownUnsafe() {
+ // Caller holds lock.
+ ::close(myFd);
+ epollFd = -1; // Indicate we are not polling.
+ inQueue.shutdown();
+ outQueue.shutdown();
+ epollCtl(EPOLL_CTL_DEL, 0);
+}
+
+void EventChannel::Descriptor::update() {
+ // Caller holds lock.
+ uint32_t events = EPOLLONESHOT | EPOLLERR | EPOLLHUP;
+ inQueue.setBit(events);
+ outQueue.setBit(events);
+ epollCtl(EPOLL_CTL_MOD, events);
+}
+
+void EventChannel::Descriptor::epollCtl(int op, uint32_t events) {
+ // Caller holds lock
+ assert(!isShutdown());
+ struct epoll_event ee;
+ memset(&ee, 0, sizeof(ee));
+ ee.data.ptr = this;
+ ee.events = events;
+ int status = ::epoll_ctl(epollFd, op, myFd, &ee);
+ if (status < 0)
throw QPID_POSIX_ERROR(errno);
+ }
+}
+
+
+EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
+ Mutex::ScopedLock l(lock);
+ cout << "DEBUG: " << std::hex << epollEvents << std::dec << endl;
+ // If we have an error:
+ if (epollEvents & (EPOLLERR | EPOLLHUP)) {
+ shutdownUnsafe();
+ // Complete both sides on error so the event can fail and
+ // mark itself with an exception.
+ epollEvents |= EPOLLIN | EPOLLOUT;
+ }
+ EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents));
+ update();
+ return ready;
}
-void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+
+// ================================================================
+// EventChannel::Impl
+
+
+EventChannel::Impl::Impl(int epollSize):
+ epollFd(-1), dispatchQueue(0)
{
- ZeroStruct<struct epoll_event> ee;
- ee.data.ptr = event;
- ee.events = epollEvents;
- if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
- throw QPID_POSIX_ERROR(errno);
+ // Create the epoll file descriptor.
+ epollFd = epoll_create(epollSize);
+ QPID_POSIX_CHECK(epollFd);
+
+ // Create a pipe and write a single byte. The byte is never
+ // read so the pipes read fd is always ready for read.
+ // We activate the FD when there are messages in the queue.
+ QPID_POSIX_CHECK(::pipe(pipe));
+ static char zero = '\0';
+ QPID_POSIX_CHECK(::write(pipe[1], &zero, 1));
+ dispatchQueue = &getDescriptor(pipe[0]).getQueue(IN);
}
-void EventHandler::epollDel(int fd) {
- if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
- throw QPID_POSIX_ERROR(errno);
+EventChannel::Impl::~Impl() {
+ close(epollFd);
+ close(pipe[0]);
+ close(pipe[1]);
}
-Event* EventHandler::complete(EventHandler& eh)
+
+/**
+ * Wait for epoll to wake up, return the descriptor or 0 on timeout.
+ */
+Event* EventChannel::Impl::wait(Time timeoutNs)
{
- assert(&eh == this);
- Event* event = mqGet();
- return event==0 ? 0 : event->complete(eh);
+ // No lock, all thread safe calls or local variables:
+ //
+ const long timeoutMs =
+ (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
+ struct epoll_event ee;
+ Event* event = 0;
+ bool doSwap = true;
+
+ // Loop till we get a completed event. Some events may repost
+ // themselves and return 0, e.g. incomplete read or write events.
+ //
+ while (!event) {
+ int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
+ if (n == 0) // Timeout
+ return 0;
+ if (n < 0 && errno != EINTR) // Interrupt, ignore it.
+ continue;
+ if (n < 0)
+ throw QPID_POSIX_ERROR(errno);
+ assert(n == 1);
+ Descriptor* ed =
+ reinterpret_cast<Descriptor*>(ee.data.ptr);
+ assert(ed);
+ EventPair ready = ed->wake(ee.events);
+
+ // We can only return one event so if both completed push one
+ // onto the dispatch queue to be dispatched in another thread.
+ if (ready.first && ready.second) {
+ // Keep it fair: in & out take turns to be returned first.
+ if (doSwap)
+ swap(ready.first, ready.second);
+ doSwap = !doSwap;
+ event = ready.first;
+ dispatchQueue->push(ready.second);
+ }
+ else {
+ event = ready.first ? ready.first : ready.second;
+ }
+ }
+ return event;
}
-
+
+EventChannel::Descriptor& EventChannel::Impl::getDescriptor(int fd) {
+ Mutex::ScopedLock l(lock);
+ Descriptor& ed = descriptors[fd];
+ ed.activate(epollFd, fd);
+ return ed;
+}
+
+
// ================================================================
// EventChannel
@@ -168,157 +367,138 @@ EventChannel::shared_ptr EventChannel::create() {
return shared_ptr(new EventChannel());
}
-EventChannel::EventChannel() : handler(new EventHandler()) {}
+EventChannel::EventChannel() : impl(new EventChannel::Impl()) {}
EventChannel::~EventChannel() {}
-void EventChannel::postEvent(Event& e)
+void EventChannel::post(Event& e)
{
- e.prepare(*handler);
+ e.prepare(*impl);
}
-Event* EventChannel::getEvent()
-{
- static const int infiniteTimeout = -1;
- ZeroStruct<struct epoll_event> epollEvent;
-
- // Loop until we can complete the event. Some events may re-post
- // themselves and return 0 from complete, e.g. partial reads. //
- Event* event = 0;
- while (event == 0) {
- int eventCount = epoll_wait(handler->getEpollFd(),
- &epollEvent, 1, infiniteTimeout);
- if (eventCount < 0) {
- if (errno != EINTR) {
- // TODO aconway 2006-11-28: Proper handling/logging of errors.
- cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
- << PosixError::getMessage(errno) << endl;
- assert(0);
- }
- }
- else if (eventCount == 1) {
- event = reinterpret_cast<Event*>(epollEvent.data.ptr);
- assert(event != 0);
- try {
- event = event->complete(*handler);
- }
- catch (const Exception& e) {
- if (event)
- event->setError(e);
- }
- catch (const std::exception& e) {
- if (event)
- event->setError(e);
- }
- }
- }
- return event;
+void EventChannel::post(Event* e) {
+ assert(e);
+ post(*e);
}
-Event::~Event() {}
-
-void Event::prepare(EventHandler& handler)
+Event* EventChannel::wait(Time timeoutNs)
{
- handler.mqPut(this);
+ return impl->wait(timeoutNs);
}
-bool Event::hasError() const {
- return error;
-}
-void Event::throwIfError() throw (Exception) {
- if (hasError())
- error.throwSelf();
+// ================================================================
+// Event and subclasses.
+
+Event::~Event() {}
+
+Exception::shared_ptr_const Event::getException() const {
+ return exception;
}
-Event* Event::complete(EventHandler&)
-{
- return this;
+void Event::throwIfException() {
+ if (getException())
+ exception->throwSelf();
}
void Event::dispatch()
{
+ if (!callback.empty())
+ callback();
+}
+
+void Event::setException(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex)
+ exception.reset(ex->clone().release());
+ else
+ exception.reset(new Exception(e));
+#ifndef NDEBUG
+ // Throw and re-catch the exception. Has no effect on the
+ // program but it triggers debuggers watching for throw. The
+ // context that sets the exception is more informative for
+ // debugging purposes than the one that ultimately throws it.
+ //
try {
- if (!callback.empty())
- callback();
- } catch (const std::exception&) {
- throw;
- } catch (...) {
- throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ throwIfException();
}
+ catch (...) { } // Ignored.
+#endif
}
-void Event::setError(const ExceptionHolder& e) {
- error = e;
+
+void ReadEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-void ReadEvent::prepare(EventHandler& handler)
+Event* ReadEvent::complete(EventChannel::Descriptor& ed)
{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ ssize_t n = ::read(descriptor,
+ static_cast<char*>(buffer) + bytesRead,
+ size - bytesRead);
+
+ if (n < 0 && errno != EAGAIN) { // Error
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
+ }
+ else if (n == 0) { // End of file
+ // TODO aconway 2006-12-13: Don't treat EOF as exception
+ // unless we're partway thru a !noWait read.
+ setException(QPID_POSIX_ERROR(ENODATA));
+ ed.shutdownUnsafe(); // Called with lock held.
+ }
+ else {
+ if (n > 0) // possible that n < 0 && errno == EAGAIN
+ bytesRead += n;
+ if (bytesRead < size && !noWait) {
+ // Continue reading, not enough data.
+ return 0;
+ }
+ }
+ return this;
}
-ssize_t ReadEvent::doRead() {
- ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
- size - received);
- if (n > 0) received += n;
- return n;
+
+void WriteEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDescriptor(descriptor).getQueue(OUT).push(this);
}
-Event* ReadEvent::complete(EventHandler& handler)
+
+Event* WriteEvent::complete(EventChannel::Descriptor& ed)
{
- // Read as much as possible without blocking.
- ssize_t n = doRead();
- while (n > 0 && received < size) doRead();
-
- if (received == size) {
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- return this;
- }
- else if (n <0 && (errno == EAGAIN)) {
- // Keep polling for more.
- handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ ssize_t n = ::write(descriptor,
+ static_cast<const char*>(buffer) + bytesWritten,
+ size - bytesWritten);
+ if(n < 0 && errno == EAGAIN && noWait) {
return 0;
}
- else {
- // Unexpected EOF or error. Throw ENODATA for EOF.
- handler.epollDel(descriptor);
- received = 0; // Reset for re-use.
- throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ if (n < 0 || (bytesWritten += n) < size) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
}
+ return this;
}
-void WriteEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+void AcceptEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDescriptor(descriptor).getQueue(IN).push(this);
}
-Event* WriteEvent::complete(EventHandler& handler)
+Event* AcceptEvent::complete(EventChannel::Descriptor& ed)
{
- ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
- size - written);
- if (n < 0) throw QPID_POSIX_ERROR(errno);
- written += n;
- if(written < size) {
- // Keep polling.
- handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
- return 0;
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) {
+ setException(QPID_POSIX_ERROR(errno));
+ ed.shutdownUnsafe(); // Called with lock held.
}
- written = 0; // Reset for re-use.
- handler.epollDel(descriptor);
return this;
}
-void AcceptEvent::prepare(EventHandler& handler)
-{
- handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+void DispatchEvent::prepare(EventChannel::Impl& impl) {
+ impl.getDispatchQueue().push(this);
}
-Event* AcceptEvent::complete(EventHandler& handler)
+Event* DispatchEvent::complete(EventChannel::Descriptor&)
{
- handler.epollDel(descriptor);
- accepted = ::accept(descriptor, 0, 0);
- if (accepted < 0) throw QPID_POSIX_ERROR(errno);
return this;
}
diff --git a/cpp/lib/common/sys/posix/EventChannel.h b/cpp/lib/common/sys/posix/EventChannel.h
index 49c7fce740..60c4026fbc 100644
--- a/cpp/lib/common/sys/posix/EventChannel.h
+++ b/cpp/lib/common/sys/posix/EventChannel.h
@@ -19,8 +19,11 @@
*
*/
-#include <SharedObject.h>
-#include <ExceptionHolder.h>
+#include "SharedObject.h"
+#include "Exception.h"
+#include "sys/Monitor.h"
+#include "sys/Time.h"
+
#include <boost/function.hpp>
#include <memory>
@@ -28,11 +31,47 @@ namespace qpid {
namespace sys {
class Event;
-class EventHandler;
-class EventChannel;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void post(Event& event);
+
+ /** Post an event to the channel. Must not be 0. */
+ void post(Event* event);
+
+ /**
+ * Wait for the next complete event, up to timeout.
+ *@return Pointer to event or 0 if timeout elapses.
+ */
+ Event* wait(Time timeout = TIME_INFINITE);
+
+ class Impl;
+ class Queue;
+ class Descriptor;
+
+ private:
+
+ EventChannel();
+
+ Mutex lock;
+ boost::shared_ptr<Impl> impl;
+};
/**
* Base class for all Events.
+ * Derived classes define events representing various async IO operations.
+ * When an event is complete, it is returned by the EventChannel to
+ * a thread calling wait. The thread will call Event::dispatch() to
+ * execute code associated with event completion.
*/
class Event
{
@@ -40,135 +79,137 @@ class Event
/** Type for callback when event is dispatched */
typedef boost::function0<void> Callback;
- /**
- * Create an event with optional callback.
- * Instances of Event are sent directly through the channel.
- * Derived classes define additional waiting behaviour.
- *@param cb A callback functor that is invoked when dispatch() is called.
- */
- Event(Callback cb = 0) : callback(cb) {}
-
virtual ~Event();
/** Call the callback provided to the constructor, if any. */
void dispatch();
- /** True if there was an error processing this event */
- bool hasError() const;
+ /**
+ *If there was an exception processing this Event, return it.
+ *@return 0 if there was no exception. Caller must not delete.
+ */
+ qpid::Exception::shared_ptr_const getException() const;
+
+ /** If getException() throw the corresponding exception. */
+ void throwIfException();
+
+ /** Set the dispatch callback. */
+ void setCallback(Callback cb) { callback = cb; }
- /** If hasError() throw the corresponding exception. */
- void throwIfError() throw(Exception);
+ /** Set the exception. */
+ void setException(const std::exception& e);
protected:
- virtual void prepare(EventHandler&);
- virtual Event* complete(EventHandler&);
- void setError(const ExceptionHolder& e);
+ Event(Callback cb=0) : callback(cb) {}
+
+ virtual void prepare(EventChannel::Impl&) = 0;
+ virtual Event* complete(EventChannel::Descriptor&) = 0;
Callback callback;
- ExceptionHolder error;
+ Exception::shared_ptr_const exception;
friend class EventChannel;
- friend class EventHandler;
+ friend class EventChannel::Queue;
};
-template <class BufT>
-class IOEvent : public Event {
+/**
+ * An event that does not wait for anything, it is processed
+ * immediately by one of the channel threads.
+ */
+class DispatchEvent : public Event {
public:
- void getDescriptor() const { return descriptor; }
- size_t getSize() const { return size; }
- BufT getBuffer() const { return buffer; }
-
+ DispatchEvent(Callback cb=0) : Event(cb) {}
+
protected:
- IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
- Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+ void prepare(EventChannel::Impl&);
+ Event* complete(EventChannel::Descriptor&);
+};
+
+// Utility base class.
+class FDEvent : public Event {
+ public:
+ int getDescriptor() const { return descriptor; }
+ protected:
+ FDEvent(Callback cb = 0, int fd = 0)
+ : Event(cb), descriptor(fd) {}
int descriptor;
- BufT buffer;
- size_t size;
};
+// Utility base class
+class IOEvent : public FDEvent {
+ public:
+ size_t getSize() const { return size; }
+
+ protected:
+ IOEvent(Callback cb, int fd, size_t sz, bool noWait_) :
+ FDEvent(cb, fd), size(sz), noWait(noWait_) {}
+
+ size_t size;
+ bool noWait;
+};
+
/** Asynchronous read event */
-class ReadEvent : public IOEvent<void*>
+class ReadEvent : public IOEvent
{
public:
- explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
- IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+ explicit ReadEvent(
+ int fd=-1, void* buf=0, size_t sz=0,
+ Callback cb=0, bool noWait=false
+ ) : IOEvent(cb, fd, sz, noWait), buffer(buf), bytesRead(0) {}
+ void* getBuffer() const { return buffer; }
+ size_t getBytesRead() const { return bytesRead; }
+
private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ void prepare(EventChannel::Impl&);
+ Event* complete(EventChannel::Descriptor&);
ssize_t doRead();
- size_t received;
+ void* buffer;
+ size_t bytesRead;
};
/** Asynchronous write event */
-class WriteEvent : public IOEvent<const void*>
+class WriteEvent : public IOEvent
{
public:
explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
Callback cb=0) :
- IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+ IOEvent(cb, fd, sz, noWait), buffer(buf), bytesWritten(0) {}
- protected:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ const void* getBuffer() const { return buffer; }
+ size_t getBytesWritten() const { return bytesWritten; }
private:
+ void prepare(EventChannel::Impl&);
+ Event* complete(EventChannel::Descriptor&);
ssize_t doWrite();
- size_t written;
+
+ const void* buffer;
+ size_t bytesWritten;
};
+
/** Asynchronous socket accept event */
-class AcceptEvent : public Event
+class AcceptEvent : public FDEvent
{
public:
/** Accept a connection on fd. */
explicit AcceptEvent(int fd=-1, Callback cb=0) :
- Event(cb), descriptor(fd), accepted(0) {}
-
- /** Get descriptor for server socket */
+ FDEvent(cb, fd), accepted(0) {}
+
+ /** Get descriptor for accepted server socket */
int getAcceptedDesscriptor() const { return accepted; }
private:
- void prepare(EventHandler&);
- Event* complete(EventHandler&);
+ void prepare(EventChannel::Impl&);
+ Event* complete(EventChannel::Descriptor&);
- int descriptor;
int accepted;
};
-class QueueSet;
-
-/**
- * Channel to post and wait for events.
- */
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static shared_ptr create();
-
- ~EventChannel();
-
- /** Post an event to the channel. */
- void postEvent(Event& event);
-
- /** Post an event to the channel. Must not be 0. */
- void postEvent(Event* event) { postEvent(*event); }
-
- /**
- * Wait for the next complete event.
- *@return Pointer to event. Will never return 0.
- */
- Event* getEvent();
-
- private:
- EventChannel();
- boost::shared_ptr<EventHandler> handler;
-};
-
-
}}
diff --git a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
index 7cd6f60902..28f9beb44e 100644
--- a/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
@@ -139,11 +139,11 @@ void EventChannelAcceptor::accept()
shutdown();
return;
}
- // TODO aconway 2006-11-29: Need to reap closed connections also.
int fd = acceptEvent.getAcceptedDesscriptor();
+ threads->post(acceptEvent); // Keep accepting.
+ // TODO aconway 2006-11-29: Need to reap closed connections also.
connections.push_back(
new EventChannelConnection(threads, *factory, fd, fd, isTrace));
- threads->post(acceptEvent); // Keep accepting.
}
}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.cpp b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
index 95e699e0b0..787da72ffa 100644
--- a/cpp/lib/common/sys/posix/EventChannelThreads.cpp
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.cpp
@@ -16,26 +16,40 @@
*
*/
-#include "EventChannelThreads.h"
-#include <sys/Runnable.h>
#include <iostream>
-using namespace std;
+#include <limits>
+
#include <boost/bind.hpp>
+#include <sys/Runnable.h>
+
+#include "EventChannelThreads.h"
+
namespace qpid {
namespace sys {
+const size_t EventChannelThreads::unlimited =
+ std::numeric_limits<size_t>::max();
+
EventChannelThreads::shared_ptr EventChannelThreads::create(
- EventChannel::shared_ptr ec)
+ EventChannel::shared_ptr ec, size_t min, size_t max
+)
{
- return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+ return EventChannelThreads::shared_ptr(
+ new EventChannelThreads(ec, min, max));
}
-EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
- channel(ec), nWaiting(0), state(RUNNING)
+EventChannelThreads::EventChannelThreads(
+ EventChannel::shared_ptr ec, size_t min, size_t max) :
+ minThreads(std::max(size_t(1), min)),
+ maxThreads(std::min(min, max)),
+ channel(ec),
+ nWaiting(0),
+ state(RUNNING)
{
- // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
- addThread();
+ Monitor::ScopedLock l(monitor);
+ while (workers.size() < minThreads)
+ workers.push_back(Thread(*this));
}
EventChannelThreads::~EventChannelThreads() {
@@ -43,34 +57,37 @@ EventChannelThreads::~EventChannelThreads() {
join();
}
+// Termination marker event.
+static DispatchEvent terminate;
+
void EventChannelThreads::shutdown()
{
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
if (state != RUNNING) // Already shutting down.
return;
+ state = TERMINATING;
for (size_t i = 0; i < workers.size(); ++i) {
- channel->postEvent(terminate);
+ channel->post(terminate);
}
- state = TERMINATE_SENT;
- notify(); // Wake up one join() thread.
+ monitor.notify(); // Wake up one join() thread.
}
void EventChannelThreads::join()
{
{
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
while (state == RUNNING) // Wait for shutdown to start.
- wait();
+ monitor.wait();
if (state == SHUTDOWN) // Shutdown is complete
return;
if (state == JOINING) {
// Someone else is doing the join.
while (state != SHUTDOWN)
- wait();
+ monitor.wait();
return;
}
// I'm the joining thread
- assert(state == TERMINATE_SENT);
+ assert(state == TERMINATING);
state = JOINING;
} // Drop the lock.
@@ -79,12 +96,13 @@ void EventChannelThreads::join()
workers[i].join();
}
state = SHUTDOWN;
- notifyAll(); // Notify other join() threaeds.
+ monitor.notifyAll(); // Notify any other join() threads.
}
void EventChannelThreads::addThread() {
- ScopedLock l(*this);
- workers.push_back(Thread(*this));
+ Monitor::ScopedLock l(monitor);
+ if (workers.size() < maxThreads)
+ workers.push_back(Thread(*this));
}
void EventChannelThreads::run()
@@ -93,26 +111,22 @@ void EventChannelThreads::run()
AtomicCount::ScopedIncrement inc(nWaiting);
try {
while (true) {
- Event* e = channel->getEvent();
+ Event* e = channel->wait();
assert(e != 0);
- if (e == &terminate) {
+ if (e == &terminate)
return;
- }
AtomicCount::ScopedDecrement dec(nWaiting);
- // I'm no longer waiting, make sure someone is.
- if (dec == 0)
+ // Make sure there's at least one waiting thread.
+ if (dec == 0 && state == RUNNING)
addThread();
e->dispatch();
}
}
catch (const std::exception& e) {
- // TODO aconway 2006-11-15: need better logging across the board.
- std::cerr << "EventChannelThreads::run() caught: " << e.what()
- << std::endl;
+ Exception::log(e, "Exception in EventChannelThreads::run()");
}
catch (...) {
- std::cerr << "EventChannelThreads::run() caught unknown exception."
- << std::endl;
+ Exception::logUnknown("Exception in EventChannelThreads::run()");
}
}
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h
index 98403c0869..721a5e9d24 100644
--- a/cpp/lib/common/sys/posix/EventChannelThreads.h
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.h
@@ -20,11 +20,12 @@
*/
#include <vector>
-#include <Exception.h>
-#include <sys/Time.h>
-#include <sys/Monitor.h>
-#include <sys/Thread.h>
-#include <sys/AtomicCount.h>
+#include "Exception.h"
+#include "sys/AtomicCount.h"
+#include "sys/Monitor.h"
+#include "sys/Thread.h"
+#include "sys/Time.h"
+
#include "EventChannel.h"
namespace qpid {
@@ -33,26 +34,36 @@ namespace sys {
/**
Dynamic thread pool serving an EventChannel.
- Threads run a loop { e = getEvent(); e->dispatch(); }
+ Threads run a loop { e = wait(); e->dispatch(); }
The size of the thread pool is automatically adjusted to optimal size.
*/
class EventChannelThreads :
public qpid::SharedObject<EventChannelThreads>,
- public sys::Monitor, private sys::Runnable
+ private sys::Runnable
{
public:
- /** Create the thread pool and start initial threads. */
+ /** Constant to represent an unlimited number of threads */
+ static const size_t unlimited;
+
+ /**
+ * Create the thread pool and start initial threads.
+ * @param minThreads Pool will initialy contain minThreads threads and
+ * will never shrink to less until shutdown.
+ * @param maxThreads Pool will never grow to more than maxThreads.
+ */
static EventChannelThreads::shared_ptr create(
- EventChannel::shared_ptr channel
+ EventChannel::shared_ptr channel = EventChannel::create(),
+ size_t minThreads = 1,
+ size_t maxThreads = unlimited
);
~EventChannelThreads();
/** Post event to the underlying channel */
- void postEvent(Event& event) { channel->postEvent(event); }
+ void post(Event& event) { channel->post(event); }
/** Post event to the underlying channel Must not be 0. */
- void postEvent(Event* event) { channel->postEvent(event); }
+ void post(Event* event) { channel->post(event); }
/**
* Terminate all threads.
@@ -68,21 +79,25 @@ class EventChannelThreads :
private:
typedef std::vector<sys::Thread> Threads;
typedef enum {
- RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ RUNNING, TERMINATING, JOINING, SHUTDOWN
} State;
- EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ EventChannelThreads(
+ EventChannel::shared_ptr channel, size_t min, size_t max);
+
void addThread();
void run();
bool keepRunning();
void adjustThreads();
+ Monitor monitor;
+ size_t minThreads;
+ size_t maxThreads;
EventChannel::shared_ptr channel;
Threads workers;
sys::AtomicCount nWaiting;
State state;
- Event terminate;
};
diff --git a/cpp/lib/common/sys/posix/PosixAcceptor.cpp b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
index 842aa76f36..e69de29bb2 100644
--- a/cpp/lib/common/sys/posix/PosixAcceptor.cpp
+++ b/cpp/lib/common/sys/posix/PosixAcceptor.cpp
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <sys/Acceptor.h>
-#include <Exception.h>
-
-namespace qpid {
-namespace sys {
-
-namespace {
-void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
-}
-
-class PosixAcceptor : public Acceptor {
- public:
- virtual int16_t getPort() const { fail(); return 0; }
- virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); }
- virtual void shutdown() { fail(); }
-};
-
-// Define generic Acceptor::create() to return APRAcceptor.
- Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
-{
- return Acceptor::shared_ptr(new PosixAcceptor());
-}
-
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
-
-}}
diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp
index 5bd13742f6..fc82b4e7e5 100644
--- a/cpp/lib/common/sys/posix/Socket.cpp
+++ b/cpp/lib/common/sys/posix/Socket.cpp
@@ -96,6 +96,8 @@ Socket::recv(void* data, size_t size)
int Socket::listen(int port, int backlog)
{
struct sockaddr_in name;
+ static const int ON = 1;
+ setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &ON, sizeof(ON));
name.sin_family = AF_INET;
name.sin_port = htons(port);
name.sin_addr.s_addr = 0;
@@ -111,8 +113,15 @@ int Socket::listen(int port, int backlog)
return ntohs(name.sin_port);
}
+Socket Socket::accept() {
+ int accepted = ::accept(socket, 0, 0);
+ if (accepted < 0)
+ throw (QPID_POSIX_ERROR(errno));
+ return Socket(accepted);
+}
+
-int Socket::fd()
+int Socket::fd()const
{
return socket;
}
diff --git a/cpp/lib/common/sys/posix/check.cpp b/cpp/lib/common/sys/posix/check.cpp
index 408679caa8..4ddacb3fbd 100644
--- a/cpp/lib/common/sys/posix/check.cpp
+++ b/cpp/lib/common/sys/posix/check.cpp
@@ -32,8 +32,8 @@ PosixError::getMessage(int errNo)
return std::string(strerror_r(errNo, buf, sizeof(buf)));
}
-PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
- : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+PosixError::PosixError(int errNo, const qpid::QpidError::Location& l) throw()
+ : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), l)
{ }
}}
diff --git a/cpp/lib/common/sys/posix/check.h b/cpp/lib/common/sys/posix/check.h
index 5afbe8f5a8..052fb08580 100644
--- a/cpp/lib/common/sys/posix/check.h
+++ b/cpp/lib/common/sys/posix/check.h
@@ -37,13 +37,15 @@ class PosixError : public qpid::QpidError
public:
static std::string getMessage(int errNo);
- PosixError(int errNo, const qpid::SrcLine& location) throw();
+ PosixError(int errNo, const qpid::QpidError::Location& location) throw();
~PosixError() throw() {}
int getErrNo() { return errNo; }
- Exception* clone() const throw() { return new PosixError(*this); }
+ Exception::auto_ptr clone() const throw() {
+ return Exception::auto_ptr(new PosixError(*this));
+ }
void throwSelf() { throw *this; }
@@ -54,9 +56,17 @@ class PosixError : public qpid::QpidError
}}
/** Create a PosixError for the current file/line and errno. */
-#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+#define QPID_POSIX_ERROR(ERRNO) \
+ ::qpid::sys::PosixError((ERRNO), QPID_ERROR_LOCATION)
-/** Throw a posix error if errNo is non-zero */
-#define QPID_POSIX_THROW_IF(ERRNO) \
- if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
+/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+#define QPID_POSIX_CHECK(RESULT) \
+ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
+
+/** Throw QPID_POSIX_ERROR(ERRNO) if ERRNO is non zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ do { int e = (ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0)
+
+
+
#endif /*!_posix_check_h*/
diff --git a/cpp/tests/EventChannelTest.cpp b/cpp/tests/EventChannelTest.cpp
index 8e5c724a15..67b8b03ce2 100644
--- a/cpp/tests/EventChannelTest.cpp
+++ b/cpp/tests/EventChannelTest.cpp
@@ -47,8 +47,9 @@ struct RunMe : public Runnable
class EventChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(EventChannelTest);
- CPPUNIT_TEST(testEvent);
+ CPPUNIT_TEST(testDispatch);
CPPUNIT_TEST(testRead);
+ CPPUNIT_TEST(testPartialRead);
CPPUNIT_TEST(testFailedRead);
CPPUNIT_TEST(testWrite);
CPPUNIT_TEST(testFailedWrite);
@@ -72,26 +73,26 @@ class EventChannelTest : public CppUnit::TestCase
signal(SIGPIPE, SIG_IGN);
}
- // Verify that calling getEvent returns event.
+ // Verify that calling wait returns event.
template <class T> bool isNextEvent(T& event)
{
- return &event == dynamic_cast<T*>(ec->getEvent());
+ return &event == dynamic_cast<T*>(ec->wait(5*TIME_SEC));
}
template <class T> bool isNextEventOk(T& event)
{
- Event* next = ec->getEvent();
- if (next) next->throwIfError();
+ Event* next = ec->wait(TIME_SEC);
+ if (next) next->throwIfException();
return &event == next;
}
- void testEvent()
+ void testDispatch()
{
RunMe runMe;
CPPUNIT_ASSERT(!runMe.ran);
// Instances of Event just pass thru the channel immediately.
- Event e(runMe.functor());
- ec->postEvent(e);
+ DispatchEvent e(runMe.functor());
+ ec->post(e);
CPPUNIT_ASSERT(isNextEventOk(e));
e.dispatch();
CPPUNIT_ASSERT(runMe.ran);
@@ -99,42 +100,64 @@ class EventChannelTest : public CppUnit::TestCase
void testRead() {
ReadEvent re(pipe[0], readBuf, size);
- ec->postEvent(re);
+ ec->post(re);
CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size));
CPPUNIT_ASSERT(isNextEventOk(re));
- CPPUNIT_ASSERT_EQUAL(size, re.getSize());
+ CPPUNIT_ASSERT_EQUAL(size, re.getBytesRead());
CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
}
+ void testPartialRead() {
+ ReadEvent re(pipe[0], readBuf, size, 0, true);
+ ec->post(re);
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size/2), ::write(pipe[1], hello, size/2));
+ CPPUNIT_ASSERT(isNextEventOk(re));
+ CPPUNIT_ASSERT_EQUAL(size/2, re.getBytesRead());
+ CPPUNIT_ASSERT_EQUAL(std::string(hello, size/2),
+ std::string(readBuf, size/2));
+ }
+
+
void testFailedRead()
{
ReadEvent re(pipe[0], readBuf, size);
- ec->postEvent(re);
+ ec->post(re);
// EOF before all data read.
::close(pipe[1]);
CPPUNIT_ASSERT(isNextEvent(re));
- CPPUNIT_ASSERT(re.hasError());
+ CPPUNIT_ASSERT(re.getException());
try {
- re.throwIfError();
+ re.throwIfException();
CPPUNIT_FAIL("Expected QpidError.");
}
catch (const qpid::QpidError&) { }
+
+ // Try to read from closed file descriptor.
+ try {
+ ec->post(re);
+ CPPUNIT_ASSERT(isNextEvent(re));
+ re.throwIfException();
+ CPPUNIT_FAIL("Expected an exception.");
+ }
+ catch (const qpid::QpidError&) {}
+
// Bad file descriptor. Note in this case we fail
- // in postEvent and throw immediately.
+ // in post and throw immediately.
try {
- ReadEvent bad;
- ec->postEvent(bad);
+ ReadEvent bad(-1, readBuf, size);
+ ec->post(bad);
CPPUNIT_FAIL("Expected QpidError.");
}
- catch (const qpid::QpidError&) { }
+ catch (const qpid::QpidError&) {}
}
void testWrite() {
WriteEvent wr(pipe[1], hello, size);
- ec->postEvent(wr);
+ ec->post(wr);
CPPUNIT_ASSERT(isNextEventOk(wr));
+ CPPUNIT_ASSERT_EQUAL(size, wr.getBytesWritten());
CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));;
CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
}
@@ -142,43 +165,55 @@ class EventChannelTest : public CppUnit::TestCase
void testFailedWrite() {
WriteEvent wr(pipe[1], hello, size);
::close(pipe[0]);
- ec->postEvent(wr);
+ ec->post(wr);
CPPUNIT_ASSERT(isNextEvent(wr));
- CPPUNIT_ASSERT(wr.hasError());
+ CPPUNIT_ASSERT(wr.getException());
}
void testReadWrite()
{
ReadEvent re(pipe[0], readBuf, size);
WriteEvent wr(pipe[1], hello, size);
- ec->postEvent(re);
- ec->postEvent(wr);
- ec->getEvent();
- ec->getEvent();
+ ec->post(re);
+ ec->post(wr);
+ ec->wait(TIME_SEC);
+ ec->wait(TIME_SEC);
CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
}
- void testAccept() {
- Socket s = Socket::createTcp();
- int port = s.listen(0, 10);
- CPPUNIT_ASSERT(port != 0);
-
- AcceptEvent ae(s.fd());
- ec->postEvent(ae);
- Socket client = Socket::createTcp();
+ void connectSendRead(AcceptEvent& ae, int port, Socket client)
+ {
+ ec->post(ae);
+ // Connect a client, send some data, read the data.
client.connect("localhost", port);
CPPUNIT_ASSERT(isNextEvent(ae));
- ae.dispatch();
+ ae.throwIfException();
- // Verify client writes are read by the accepted descriptor.
char readBuf[size];
ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
- ec->postEvent(re);
- CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello)));
+ ec->post(re);
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size),
+ client.send(hello, sizeof(hello)));
CPPUNIT_ASSERT(isNextEvent(re));
- re.dispatch();
+ re.throwIfException();
CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
}
+
+ void testAccept() {
+ Socket s = Socket::createTcp();
+ int port = s.listen(0, 10);
+ CPPUNIT_ASSERT(port != 0);
+
+ AcceptEvent ae(s.fd());
+ Socket client = Socket::createTcp();
+ connectSendRead(ae, port, client);
+ Socket client2 = Socket::createTcp();
+ connectSendRead(ae, port, client2);
+ client.close();
+ client2.close();
+ Socket client3 = Socket::createTcp();
+ connectSendRead(ae, port, client3);
+ }
};
// Make this test suite a plugin.
diff --git a/cpp/tests/EventChannelThreadsTest.cpp b/cpp/tests/EventChannelThreadsTest.cpp
index 285ed29518..f8b4ad6f4f 100644
--- a/cpp/tests/EventChannelThreadsTest.cpp
+++ b/cpp/tests/EventChannelThreadsTest.cpp
@@ -42,7 +42,7 @@ const int totalEvents = nConnections+2*nConnections*nMessages;
* We count the total number of events, and the
* number of reads and writes for each message number.
*/
-class TestResults : public Monitor {
+class TestResults {
public:
TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
@@ -62,20 +62,21 @@ class TestResults : public Monitor {
}
void shutdown(const std::string& exceptionMsg = std::string()) {
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
exception = exceptionMsg;
isShutdown = true;
- notifyAll();
+ monitor.notifyAll();
}
void wait() {
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
Time deadline = now() + 10*TIME_SEC;
while (!isShutdown) {
- CPPUNIT_ASSERT(Monitor::wait(deadline));
+ CPPUNIT_ASSERT(monitor.wait(deadline));
}
}
+ Monitor monitor;
bool isShutdown;
std::string exception;
AtomicCount reads[nMessages];
@@ -113,30 +114,34 @@ class SafeCallback {
};
/** Repost an event N times. */
-class Repost {
+template <class T>
+class Reposter {
public:
- Repost(int n) : count (n) {}
- virtual ~Repost() {}
+ Reposter(T* event_, int n) : event(event_), original(*event_), count (n) {}
+ virtual ~Reposter() {}
- void repost(Event* event) {
+ void repost() {
if (--count==0) {
delete event;
} else {
- threads->postEvent(event);
+ *event = original;
+ threads->post(event);
}
}
private:
+ T* event;
+ T original;
int count;
};
/** Repeating read event. */
-class TestReadEvent : public ReadEvent, public Runnable, private Repost {
+class TestReadEvent : public ReadEvent, public Runnable {
public:
explicit TestReadEvent(int fd=-1) :
ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
- Repost(nMessages)
+ reposter(this, nMessages)
{}
void run() {
@@ -144,47 +149,50 @@ class TestReadEvent : public ReadEvent, public Runnable, private Repost {
CPPUNIT_ASSERT(0 <= value);
CPPUNIT_ASSERT(value < nMessages);
results.countRead(value);
- repost(this);
+ reposter.repost();
}
private:
int value;
- ReadEvent original;
+ Reposter<ReadEvent> reposter;
};
/** Fire and forget write event */
-class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
+class TestWriteEvent : public WriteEvent, public Runnable {
public:
TestWriteEvent(int fd=-1) :
WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
- Repost(nMessages),
+ reposter(this, nMessages),
value(0)
{}
void run() {
CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
results.countWrite(value++);
- repost(this);
+ reposter.repost();
}
private:
+ Reposter<WriteEvent> reposter;
int value;
};
/** Fire-and-forget Accept event, posts reads on the accepted connection. */
-class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
+class TestAcceptEvent : public AcceptEvent, public Runnable {
public:
TestAcceptEvent(int fd=-1) :
AcceptEvent(fd, SafeCallback(*this)),
- Repost(nConnections)
+ reposter(this, nConnections)
{}
void run() {
- threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
+ threads->post(new TestReadEvent(getAcceptedDesscriptor()));
results.countEvent();
- repost(this);
+ reposter.repost();
}
+ private:
+ Reposter<AcceptEvent> reposter;
};
class EventChannelThreadsTest : public CppUnit::TestCase
@@ -207,10 +215,10 @@ class EventChannelThreadsTest : public CppUnit::TestCase
{
Socket listener = Socket::createTcp();
int port = listener.listen();
-
+
// Post looping accept events, will repost nConnections times.
// The accept event will automatically post read events.
- threads->postEvent(new TestAcceptEvent(listener.fd()));
+ threads->post(new TestAcceptEvent(listener.fd()));
// Make connections.
Socket connections[nConnections];
@@ -221,7 +229,7 @@ class EventChannelThreadsTest : public CppUnit::TestCase
// Post looping write events.
for (int i = 0; i < nConnections; ++i) {
- threads->postEvent(new TestWriteEvent(connections[i].fd()));
+ threads->post(new TestWriteEvent(connections[i].fd()));
}
// Wait for all events to be dispatched.
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index 900bf47960..943afde228 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -16,9 +16,10 @@ EXTRA_DIST = \
topicall \
topictest \
qpid_test_plugin.h \
- APRBaseTest.cpp
+ MockSessionHandler.h
-client_tests = \
+
+client_exe_tests = \
client_test \
echo_service \
topic_listener \
@@ -41,7 +42,8 @@ broker_tests = \
TxAckTest \
TxBufferTest \
TxPublishTest \
- ValueTest
+ ValueTest \
+ AcceptorTest
framing_tests = \
BodyHandlerTest \
@@ -54,17 +56,21 @@ misc_tests = \
posix_tests = \
EventChannelTest \
- EventChannelThreadsTest
+ EventChannelThreadsTest \
+ EventChannelConnectionTest
+
+apr_tests = APRBaseTest.cpp
unit_tests = \
$(broker_tests) \
$(framing_tests) \
- $(misc_tests)
+ $(misc_tests) \
+ $(posix_tests)
-noinst_PROGRAMS = $(client_tests)
+noinst_PROGRAMS = $(client_exe_tests)
-TESTS = run-unit-tests run-python-tests
+TESTS = run-unit-tests run-system-tests
EXTRA_DIST += $(TESTS)
include gen.mk
@@ -77,7 +83,7 @@ lib_broker = $(abs_builddir)/../lib/broker/libqpidbroker.la
gen.mk: Makefile.am
( \
- for i in $(client_tests); do \
+ for i in $(client_exe_tests); do \
echo $${i}_SOURCES = $$i.cpp; \
echo $${i}_LDADD = '$$(lib_client) $$(lib_common) $$(extra_libs)'; \
done; \
diff --git a/cpp/tests/run-python-tests b/cpp/tests/run-python-tests
index 57be07ec1c..e69de29bb2 100755
--- a/cpp/tests/run-python-tests
+++ b/cpp/tests/run-python-tests
@@ -1,15 +0,0 @@
-#!/bin/sh
-
-set -e
-log=`pwd`/qpidd.log
-# Start the daemon, recording its PID.
-../src/qpidd > $log 2>&1 & pid=$!
-
-# Arrange to kill the daemon upon any type of termination.
-trap 'status=$?; kill $pid; exit $status' 0
-trap '(exit $?); exit $?' 1 2 13 15
-
-# Run the tests.
-cd ../../python && ./run-tests -v -I cpp_failing.txt
-
-rm -f $log
diff --git a/cpp/tests/topic_publisher.cpp b/cpp/tests/topic_publisher.cpp
index b95abd9d66..6d17b7034f 100644
--- a/cpp/tests/topic_publisher.cpp
+++ b/cpp/tests/topic_publisher.cpp
@@ -138,13 +138,15 @@ int main(int argc, char** argv){
int64_t sum(0);
for(int i = 0; i < batchSize; i++){
if(i > 0 && args.getDelay()) sleep(args.getDelay());
- Time time = publisher.publish(
- args.getMessages(), args.getSubscribers(), args.getSize());
- if(!max || time > max) max = time;
- if(!min || time < min) min = time;
- sum += time;
+ int64_t msecs =
+ publisher.publish(args.getMessages(),
+ args.getSubscribers(),
+ args.getSize()) / TIME_MSEC;
+ if(!max || msecs > max) max = msecs;
+ if(!min || msecs < min) min = msecs;
+ sum += msecs;
std::cout << "Completed " << (i+1) << " of " << batchSize
- << " in " << time/TIME_MSEC << "ms" << std::endl;
+ << " in " << msecs << "ms" << std::endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
diff --git a/cpp/tests/topictest b/cpp/tests/topictest
index 792f063bea..da3a0c1f92 100755
--- a/cpp/tests/topictest
+++ b/cpp/tests/topictest
@@ -1,42 +1,41 @@
#!/bin/bash
-# Run the c++ or java topic test
+# Run the c++ or topic test
-. `dirname $0`/env
-
-# Edit parameters here:
-
-# Big test:
-# LISTENERS=10
-# MESSAGES=10000
-# BATCHES=20
-
-LISTENERS=10
+# Defaults
+SUBSCRIBERS=10
MESSAGES=2000
BATCHES=10
-cppcmds() {
- LISTEN_CMD=topic_listener
- PUBLISH_CMD="topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $LISTENERS"
-}
+while getopts "s:m:b:" opt ; do
+ case $opt in
+ s) SUBSCRIBERS=$OPTARG ;;
+ m) MESSAGES=$OPTARG ;;
+ b) BATCHES=$OPTARG ;;
+ ?)
+ echo "Usage: %0 [-l <subscribers>] [-m <messages.] [-b <batches>]"
+ exit 1
+ ;;
+ esac
+done
-javacmds() {
- DEF=-Damqj.logging.level="error"
- LISTEN_CMD="qpid-run $DEF org.apache.qpid.topic.Listener"
- PUBLISH_CMD="qpid-run $DEF org.apache.qpid.topic.Publisher -messages $MESSAGES -batch $BATCHES -clients $LISTENERS"
+subscribe() {
+ ID=$1
+ echo "subscriber $ID"
+ ./topic_listener > subscriber.$ID 2>&1 || {
+ echo "SUBSCRIBER %ID FAILED: " ;
+ cat subscriber.$ID
+ }
+ rm subscriber.$ID
}
-case $1 in
- c) cppcmds ;;
- j) javacmds ;;
- *) cppcmds ;;
-esac
+publish() {
+ ./topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $SUBSCRIBERS
+}
-for ((i=$LISTENERS ; i--; )); do
- $LISTEN_CMD > /dev/null 2>&1 &
+for ((i=$SUBSCRIBERS ; i--; )); do
+ subscribe $i &
done
sleep 1
-echo $PUBLISH_CMD $OPTIONS
-
STATS=~/bin/topictest.times
echo "---- topictest `date`" >> $STATS
-$PUBLISH_CMD $OPTIONS | tee -a $STATS
+publish | tee -a $STATS