summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-29 14:36:08 +0000
committerAlan Conway <aconway@apache.org>2006-11-29 14:36:08 +0000
commitb13e1a24fcca8797b7be5a242f164afbe17ec4f6 (patch)
treeef0362e52c125bc75b07ef3e374dabfa52254e98
parent16d818e749462daf5e0e43079b2e48991646c619 (diff)
downloadqpid-python-b13e1a24fcca8797b7be5a242f164afbe17ec4f6.tar.gz
Posix EventChannel implementation using epoll. Placeholder for kevents.
Dynamic thread pool EventChannelThreads to serve EventChannel. Misc cleanup/enhancements. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480582 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/Makefile27
-rw-r--r--cpp/src/qpid/Exception.cpp20
-rw-r--r--cpp/src/qpid/Exception.h43
-rw-r--r--cpp/src/qpid/ExceptionHolder.cpp32
-rw-r--r--cpp/src/qpid/ExceptionHolder.h60
-rw-r--r--cpp/src/qpid/QpidError.cpp15
-rw-r--r--cpp/src/qpid/QpidError.h50
-rw-r--r--cpp/src/qpid/apr/APRBase.cpp3
-rw-r--r--cpp/src/qpid/apr/LFProcessor.cpp4
-rw-r--r--cpp/src/qpid/apr/Socket.cpp32
-rw-r--r--cpp/src/qpid/broker/AutoDelete.cpp2
-rw-r--r--cpp/src/qpid/broker/AutoDelete.h5
-rw-r--r--cpp/src/qpid/broker/Configuration.cpp2
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp3
-rw-r--r--cpp/src/qpid/client/Connection.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.cpp17
-rw-r--r--cpp/src/qpid/posix/EpollEventChannel.cpp76
-rw-r--r--cpp/src/qpid/posix/EventChannel.cpp325
-rw-r--r--cpp/src/qpid/posix/EventChannel.h176
-rw-r--r--cpp/src/qpid/posix/EventChannelThreads.cpp119
-rw-r--r--cpp/src/qpid/posix/EventChannelThreads.h92
-rw-r--r--cpp/src/qpid/posix/Socket.cpp58
-rw-r--r--cpp/src/qpid/posix/check.cpp12
-rw-r--r--cpp/src/qpid/posix/check.h31
-rw-r--r--cpp/src/qpid/sys/AtomicCount.h71
-rw-r--r--cpp/src/qpid/sys/EventChannel.h239
-rw-r--r--cpp/src/qpid/sys/Monitor.h153
-rw-r--r--cpp/src/qpid/sys/Mutex.h151
-rw-r--r--cpp/src/qpid/sys/Runnable.cpp (renamed from cpp/src/qpid/posix/EpollEventChannel.h)30
-rw-r--r--cpp/src/qpid/sys/Runnable.h20
-rw-r--r--cpp/src/qpid/sys/Socket.h30
-rw-r--r--cpp/src/qpid/sys/Thread.h42
-rw-r--r--cpp/src/qpid/sys/Time.cpp39
-rw-r--r--cpp/src/qpid/sys/Time.h46
-rw-r--r--cpp/test/client/client_test.cpp4
-rw-r--r--cpp/test/client/echo_service.cpp4
-rw-r--r--cpp/test/client/topic_listener.cpp13
-rw-r--r--cpp/test/client/topic_publisher.cpp12
-rw-r--r--cpp/test/unit/qpid/ExceptionTest.cpp61
-rw-r--r--cpp/test/unit/qpid/posix/EventChannelTest.cpp187
-rw-r--r--cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp247
43 files changed, 1903 insertions, 661 deletions
diff --git a/cpp/Makefile b/cpp/Makefile
index dd7551a648..8f7b6795b2 100644
--- a/cpp/Makefile
+++ b/cpp/Makefile
@@ -72,6 +72,7 @@ $(BUILDDIRS):
## Library rules
+## DONT COMMIT
LIB_common := $(call LIBFILE,common,1.0)
DIRS_common := qpid qpid/framing qpid/sys qpid/$(PLATFORM)
$(LIB_common): $(call OBJECTS, $(DIRS_common))
@@ -79,13 +80,13 @@ $(LIB_common): $(call OBJECTS, $(DIRS_common))
LIB_client := $(call LIBFILE,client,1.0)
DIRS_client := qpid/client
-$(LIB_client): $(call OBJECTS,$(DIRS_client)) $(LIB_common)
- $(LIB_COMMAND)
+$(LIB_client): $(call OBJECTS,$(DIRS_client))
+ $(LIB_COMMAND) $(LIB_common)
LIB_broker := $(call LIBFILE,broker,1.0)
DIRS_broker := qpid/broker
-$(LIB_broker): $(call OBJECTS,$(DIRS_broker)) $(LIB_common)
- $(LIB_COMMAND)
+$(LIB_broker): $(call OBJECTS,$(DIRS_broker))
+ $(LIB_COMMAND) $(LIB_common)
## Daemon executable
$(BINDIR)/qpidd: $(OBJDIR)/qpidd.o $(LIB_common) $(LIB_broker)
@@ -94,12 +95,20 @@ $(BINDIR)/qpidd: $(OBJDIR)/qpidd.o $(LIB_common) $(LIB_broker)
all-nogen: $(BINDIR)/qpidd
## Unit tests.
-UNITTEST_SRC:=$(shell find test/unit -name *Test.cpp)
-UNITTEST_SRC:=$(filter-out test/unit/qpid/$(IGNORE)/%,$(UNITTEST_SRC))
-UNITTESTS:=$(UNITTEST_SRC:test/unit/%.cpp=$(TESTDIR)/%.so)
+define UNITTEST_GROUP
+UNITTEST_SRC_$1 := $(wildcard $(DIRS_$1:%=test/unit/%/*Test.cpp))
+UNITTEST_SO_$1 := $$(UNITTEST_SRC_$1:test/unit/%.cpp=$(TESTDIR)/%.so)
+$$(UNITTEST_SO_$1): $(LIB_$1)
+UNITTESTS := $$(UNITTESTS) $$(UNITTEST_SO_$1)
+endef
-unittest: all
+$(eval $(call UNITTEST_GROUP,common))
+$(eval $(call UNITTEST_GROUP,broker))
+$(eval $(call UNITTEST_GROUP,client))
+
+unittest: $(UNITTESTS)
DllPlugInTester -c -b $(UNITTESTS:.cpp=.so)
+
all-nogen: $(UNITTESTS)
## Run python tests
@@ -143,7 +152,7 @@ all-nogen: $(CLIENT_TEST_EXE)
client: $(CLIENT_TEST_EXE)
## include dependencies
-DEPFILES:=$(wildcard $(OBJDIR)/*.d $(OBJDIR)/*/*.d $(OBJDIR)/*/*/*.d)
+DEPFILES:=$(shell find $(OBJDIR) $(TESTDIR) -name "*.d")
ifdef DEPFILES
-include $(DEPFILES)
endif
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index 1263ba8472..44aad4cb4f 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -21,4 +21,22 @@
#include <qpid/Exception.h>
-qpid::Exception::~Exception() throw() {}
+namespace qpid {
+
+Exception::Exception() throw() {}
+
+Exception::Exception(const std::string& str) throw() : whatStr(str) {}
+
+Exception::Exception(const char* str) throw() : whatStr(str) {}
+
+Exception::~Exception() throw() {}
+
+const char* Exception::what() const throw() { return whatStr.c_str(); }
+
+std::string Exception::toString() const throw() { return whatStr; }
+
+Exception* Exception::clone() const throw() { return new Exception(*this); }
+
+void Exception::throwSelf() const { throw *this; }
+
+} // namespace qpid
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 2aba43586d..f35d427bb0 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -25,6 +25,7 @@
#include <exception>
#include <string>
#include <memory>
+#include <boost/shared_ptr.hpp>
namespace qpid
{
@@ -37,42 +38,24 @@ class Exception : public std::exception
std::string whatStr;
public:
- Exception() throw() {}
- Exception(const std::string& str) throw() : whatStr(str) {}
- Exception(const char* str) throw() : whatStr(str) {}
+ Exception() throw();
+ Exception(const std::string& str) throw();
+ Exception(const char* str) throw();
+ Exception(const std::exception&) throw();
+
virtual ~Exception() throw();
- virtual const char* what() const throw() { return whatStr.c_str(); }
- virtual std::string toString() const throw() { return whatStr; }
+ virtual const char* what() const throw();
+ virtual std::string toString() const throw();
+
+ virtual Exception* clone() const throw();
+ virtual void throwSelf() const;
+
+ typedef boost::shared_ptr<Exception> shared_ptr;
};
-/**
- * Wrapper for heap-allocated exceptions. Use like this:
- * <code>
- * std::auto_ptr<Exception> ex = new SomeEx(...)
- * HeapException hex(ex); // Takes ownership
- * throw hex; // Auto-deletes ex
- * </code>
- */
-class HeapException : public Exception, public std::auto_ptr<Exception>
-{
- public:
- HeapException() {}
- HeapException(std::auto_ptr<Exception> e) : std::auto_ptr<Exception>(e) {}
- HeapException& operator=(std::auto_ptr<Exception>& e) {
- std::auto_ptr<Exception>::operator=(e);
- return *this;
- }
- ~HeapException() throw() {}
-
- virtual const char* what() const throw() { return (*this)->what(); }
- virtual std::string toString() const throw() {
- return (*this)->toString();
- }
-};
-
}
#endif /*!_Exception_*/
diff --git a/cpp/src/qpid/ExceptionHolder.cpp b/cpp/src/qpid/ExceptionHolder.cpp
new file mode 100644
index 0000000000..de8d7b2487
--- /dev/null
+++ b/cpp/src/qpid/ExceptionHolder.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "ExceptionHolder.h"
+
+namespace qpid {
+
+ExceptionHolder::ExceptionHolder(const std::exception& e) {
+ const Exception* ex = dynamic_cast<const Exception*>(&e);
+ if (ex) {
+ reset(ex->clone());
+ } else {
+ reset(new Exception(e.what()));
+ }
+}
+
+}
diff --git a/cpp/src/qpid/ExceptionHolder.h b/cpp/src/qpid/ExceptionHolder.h
new file mode 100644
index 0000000000..c2deca803e
--- /dev/null
+++ b/cpp/src/qpid/ExceptionHolder.h
@@ -0,0 +1,60 @@
+#ifndef _qpid_ExceptionHolder_h
+#define _qpid_ExceptionHolder_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/Exception.h>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+/**
+ * Holder for a heap-allocated exc eption that can be stack allocated
+ * and thrown safely.
+ *
+ * Basically this is a shared_ptr with the Exception functions added
+ * so the catcher need not be aware that it is a pointer rather than a
+ * reference.
+ *
+ * shared_ptr is chosen over auto_ptr because it has normal
+ * copy semantics.
+ */
+class ExceptionHolder : public Exception, public boost::shared_ptr<Exception>
+{
+ public:
+ typedef boost::shared_ptr<Exception> shared_ptr;
+
+ ExceptionHolder() throw() {}
+ ExceptionHolder(Exception* p) throw() : shared_ptr(p) {}
+ ExceptionHolder(shared_ptr p) throw() : shared_ptr(p) {}
+
+ ExceptionHolder(const Exception& e) throw() : shared_ptr(e.clone()) {}
+ ExceptionHolder(const std::exception& e);
+
+ const char* what() const throw() { return (*this)->what(); }
+ std::string toString() const throw() { return (*this)->toString(); }
+ virtual Exception* clone() const throw() { return (*this)->clone(); }
+ virtual void throwSelf() const { (*this)->throwSelf(); }
+};
+
+} // namespace qpid
+
+
+
+#endif /*!_qpid_ExceptionHolder_h*/
diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp
index 0b5135269b..70fff6550a 100644
--- a/cpp/src/qpid/QpidError.cpp
+++ b/cpp/src/qpid/QpidError.cpp
@@ -24,12 +24,21 @@
using namespace qpid;
-QpidError::QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw()
- : code(_code), msg(_msg), file(_file), line(_line)
+QpidError::QpidError() : code(0) {}
+
+QpidError::QpidError(int _code, const std::string& _msg,
+ const SrcLine& _loc) throw()
+ : code(_code), msg(_msg), location(_loc)
{
std::ostringstream os;
- os << "QpidError(" << code << ") " << msg << " (" << file << ":" << line << ")";
+ os << "Error [" << code << "] " << msg << " ("
+ << location.file << ":" << location.line << ")";
whatStr = os.str();
}
QpidError::~QpidError() throw() {}
+
+Exception* QpidError::clone() const throw() { return new QpidError(*this); }
+
+void QpidError::throwSelf() const { throw *this; }
+
diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h
index 79ccd2f579..5d7aa93674 100644
--- a/cpp/src/qpid/QpidError.h
+++ b/cpp/src/qpid/QpidError.h
@@ -21,29 +21,47 @@
*
*/
#include <string>
+#include <memory>
+#include <ostream>
#include <qpid/Exception.h>
namespace qpid {
- class QpidError : public Exception {
- public:
- const int code;
- const std::string msg;
- const std::string file;
- const int line;
+struct SrcLine {
+ public:
+ SrcLine(const std::string& file_="", int line_=0) :
+ file(file_), line(line_) {}
- QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw();
- ~QpidError() throw();
- };
+ std::string file;
+ int line;
+};
+
+class QpidError : public Exception {
+ public:
+ const int code;
+ const std::string msg;
+ const SrcLine location;
-#define THROW_QPID_ERROR(A, B) throw QpidError(A, B, __FILE__, __LINE__)
+ QpidError();
+ QpidError(int _code, const std::string& _msg, const SrcLine& _loc) throw();
+ ~QpidError() throw();
+ Exception* clone() const throw();
+ void throwSelf() const;
+};
-}
-#define PROTOCOL_ERROR 10000
-#define APR_ERROR 20000
-#define FRAMING_ERROR 30000
-#define CLIENT_ERROR 40000
-#define INTERNAL_ERROR 50000
+} // namespace qpid
+
+#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
+
+#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
+
+#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
+
+const int PROTOCOL_ERROR = 10000;
+const int APR_ERROR = 20000;
+const int FRAMING_ERROR = 30000;
+const int CLIENT_ERROR = 40000;
+const int INTERNAL_ERROR = 50000;
#endif
diff --git a/cpp/src/qpid/apr/APRBase.cpp b/cpp/src/qpid/apr/APRBase.cpp
index c3dbda3df0..7a35f098ec 100644
--- a/cpp/src/qpid/apr/APRBase.cpp
+++ b/cpp/src/qpid/apr/APRBase.cpp
@@ -87,7 +87,8 @@ void qpid::sys::check(apr_status_t status, const std::string& file, const int li
const int size = 50;
char tmp[size];
std::string msg(apr_strerror(status, tmp, size));
- throw QpidError(APR_ERROR + ((int) status), msg, file, line);
+ throw QpidError(APR_ERROR + ((int) status), msg,
+ qpid::SrcLine(file, line));
}
}
diff --git a/cpp/src/qpid/apr/LFProcessor.cpp b/cpp/src/qpid/apr/LFProcessor.cpp
index c8a583a34c..0187beab10 100644
--- a/cpp/src/qpid/apr/LFProcessor.cpp
+++ b/cpp/src/qpid/apr/LFProcessor.cpp
@@ -134,8 +134,8 @@ void LFProcessor::run(){
session->stopProcessing();
}
}
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }catch(std::exception e){
+ std::cout << e.what() << std::endl;
}
}
diff --git a/cpp/src/qpid/apr/Socket.cpp b/cpp/src/qpid/apr/Socket.cpp
index 9ef6baeb88..3cf510872f 100644
--- a/cpp/src/qpid/apr/Socket.cpp
+++ b/cpp/src/qpid/apr/Socket.cpp
@@ -27,21 +27,24 @@
using namespace qpid::sys;
-Socket::Socket()
-{
+Socket Socket::createTcp() {
+ Socket s;
CHECK_APR_SUCCESS(
apr_socket_create(
- &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
APRPool::get()));
+ return s;
}
-void Socket::setTimeout(long msecs)
-{
- apr_socket_timeout_set(socket, msecs*1000);
+Socket::Socket(apr_socket_t* s) {
+ socket = s;
}
-void Socket::connect(const std::string& host, int port)
-{
+void Socket::setTimeout(Time interval) {
+ apr_socket_timeout_set(socket, interval/TIME_USEC);
+}
+
+void Socket::connect(const std::string& host, int port) {
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(
apr_sockaddr_info_get(
@@ -50,27 +53,28 @@ void Socket::connect(const std::string& host, int port)
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
}
-void Socket::close()
-{
+void Socket::close() {
if (socket == 0) return;
CHECK_APR_SUCCESS(apr_socket_close(socket));
socket = 0;
}
-ssize_t Socket::send(const char* data, size_t size)
+ssize_t Socket::send(const void* data, size_t size)
{
apr_size_t sent = size;
- apr_status_t status = apr_socket_send(socket, data, &sent);
+ apr_status_t status =
+ apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
if (APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
CHECK_APR_SUCCESS(status);
return sent;
}
-ssize_t Socket::recv(char* data, size_t size)
+ssize_t Socket::recv(void* data, size_t size)
{
apr_size_t received = size;
- apr_status_t status = apr_socket_recv(socket, data, &received);
+ apr_status_t status =
+ apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
if (APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
CHECK_APR_SUCCESS(status);
return received;
diff --git a/cpp/src/qpid/broker/AutoDelete.cpp b/cpp/src/qpid/broker/AutoDelete.cpp
index 6c7ad432db..45232f154c 100644
--- a/cpp/src/qpid/broker/AutoDelete.cpp
+++ b/cpp/src/qpid/broker/AutoDelete.cpp
@@ -63,7 +63,7 @@ void AutoDelete::run(){
Monitor::ScopedLock l(monitor);
while(!stopped){
process();
- monitor.wait(period * Time::NSEC_PER_MSEC);
+ monitor.wait(period*TIME_MSEC);
}
}
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h
index b706246900..a49014314d 100644
--- a/cpp/src/qpid/broker/AutoDelete.h
+++ b/cpp/src/qpid/broker/AutoDelete.h
@@ -1,3 +1,5 @@
+#ifndef _AutoDelete_
+#define _AutoDelete_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,9 +20,6 @@
* under the License.
*
*/
-#ifndef _AutoDelete_
-#define _AutoDelete_
-
#include <iostream>
#include <queue>
#include <qpid/sys/Monitor.h>
diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp
index 39f5c23ee6..1fce1acf4c 100644
--- a/cpp/src/qpid/broker/Configuration.cpp
+++ b/cpp/src/qpid/broker/Configuration.cpp
@@ -52,7 +52,7 @@ void Configuration::parse(int argc, char** argv){
matched = (*i)->parse(position, argv, argc);
}
if(!matched){
- std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
+ std::cout<< "Warning: skipping unrecognised option " << argv[position] << std::endl;
position++;
}
}
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 0f3223d3a6..932038c4da 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -43,7 +43,8 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
bindings.erase(i);
- // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
+ // TODO aconway 2006-09-14: What about the ExchangeBinding object?
+ // Don't we have to verify routingKey/args match?
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 1150d055cb..00b0a844ab 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -42,7 +42,7 @@ Queue::Queue(const string& _name, u_int32_t _autodelete,
exclusive(0),
persistenceId(0)
{
- if(autodelete) lastUsed = Time::now().msecs();
+ if(autodelete) lastUsed = now()/TIME_MSEC;
}
Queue::~Queue(){
@@ -137,7 +137,7 @@ void Queue::consume(Consumer* c, bool requestExclusive){
void Queue::cancel(Consumer* c){
Mutex::ScopedLock locker(lock);
consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = Time::now().msecs();
+ if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
if(exclusive == c) exclusive = 0;
}
@@ -170,7 +170,7 @@ u_int32_t Queue::getConsumerCount() const{
bool Queue::canAutoDelete() const{
Mutex::ScopedLock locker(lock);
- return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
+ return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
}
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 34fb25781e..938548d091 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -82,7 +82,8 @@ void TopicPattern::normalize() {
namespace {
// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
-// Need more efficient Tokens impl that can operate on a string in place.
+// Need StringRef class that operates on a string in place witout copy.
+// Should be applied everywhere strings are extracted from frames.
//
bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
{
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index de324fdab4..0b520d169d 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -216,7 +216,7 @@ void Connection::error(int code, const string& msg, int classid, int methodid){
}
void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
- std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.file << ":" << e.line << ")" << std::endl;
+ std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
string msg = e.msg;
if(method == 0){
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 86fbdc062c..116ea74193 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -44,6 +44,7 @@ Connector::Connector(bool _debug, u_int32_t buffer_size) :
Connector::~Connector(){ }
void Connector::connect(const std::string& host, int port){
+ socket = Socket::createTcp();
socket.connect(host, port);
closed = false;
receiver = Thread(this);
@@ -92,7 +93,7 @@ void Connector::writeToSocket(char* data, size_t available){
while(written < available && !closed){
ssize_t sent = socket.send(data + written, available-written);
if(sent > 0) {
- lastOut = Time::now().msecs();
+ lastOut = now() * TIME_MSEC;
written += sent;
}
}
@@ -106,17 +107,17 @@ void Connector::handleClosed(){
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
- int64_t now = Time::now().msecs();
+ Time t = now() * TIME_MSEC;
if(status == Socket::SOCKET_TIMEOUT) {
- if(idleIn && (now - lastIn > idleIn)){
+ if(idleIn && (t - lastIn > idleIn)){
timeoutHandler->idleIn();
}
}else if(status == Socket::SOCKET_EOF){
handleClosed();
}else{
- lastIn = now;
+ lastIn = t;
}
- if(idleOut && (now - lastOut > idleOut)){
+ if(idleOut && (t - lastOut > idleOut)){
timeoutHandler->idleOut();
}
}
@@ -140,7 +141,7 @@ void Connector::setWriteTimeout(u_int16_t t){
}
void Connector::setSocketTimeout(){
- socket.setTimeout(timeout);
+ socket.setTimeout(timeout*TIME_MSEC);
}
void Connector::setTimeoutHandler(TimeoutHandler* handler){
@@ -171,7 +172,9 @@ void Connector::run(){
}
}
}catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << "Error [" << error.code << "] " << error.msg
+ << " (" << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
handleClosed();
}
}
diff --git a/cpp/src/qpid/posix/EpollEventChannel.cpp b/cpp/src/qpid/posix/EpollEventChannel.cpp
deleted file mode 100644
index 7dce4bc58c..0000000000
--- a/cpp/src/qpid/posix/EpollEventChannel.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/sys/EventChannel.h>
-#include <sys/epoll.h>
-#include "EpollEventChannel.h"
-
-namespace qpid {
-namespace sys {
-
-EventChannel::shared_ptr EventChannel::create()
-{
- return EventChannel::shared_ptr(new EpollEventChannel());
-}
-
-EpollEventChannel::EpollEventChannel()
-{
- // TODO aconway 2006-11-13: How to choose size parameter?
- static const size_t estimatedFdsForEpoll = 1000;
- epollFd = epoll_create(estimatedFdsForEpoll);
-}
-
-EpollEventChannel::~EpollEventChannel() { }
-
-void
-EpollEventChannel::post(ReadEvent& /*event*/)
-{
-}
-
-void
-EpollEventChannel::post(WriteEvent& /*event*/)
-{
-}
-
-void
-EpollEventChannel::post(AcceptEvent& /*event*/)
-{
-}
-
-void
-EpollEventChannel::post(NotifyEvent& /*event*/)
-{
-}
-
-inline void
-EpollEventChannel::post(Event& /*event*/)
-{
-}
-
-Event*
-EpollEventChannel::getEvent()
-{
- return 0;
-}
-
-void
-EpollEventChannel::dispose(void* /*buffer*/, size_t)
-{
-}
-
-}}
diff --git a/cpp/src/qpid/posix/EventChannel.cpp b/cpp/src/qpid/posix/EventChannel.cpp
new file mode 100644
index 0000000000..9d0819f206
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannel.cpp
@@ -0,0 +1,325 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <mqueue.h>
+#include <string.h>
+#include <iostream>
+
+#include <sys/errno.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#include <typeinfo>
+#include <iostream>
+#include <queue>
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/current_function.hpp>
+
+#include <qpid/QpidError.h>
+#include <qpid/sys/Monitor.h>
+
+#include "check.h"
+#include "EventChannel.h"
+
+using namespace std;
+
+
+// Convenience template to zero out a struct.
+template <class S> struct ZeroStruct : public S {
+ ZeroStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+namespace qpid {
+namespace sys {
+
+
+/**
+ * EventHandler wraps an epoll file descriptor. Acts as private
+ * interface between EventChannel and subclasses.
+ *
+ * Also implements Event interface for events that are not associated
+ * with a file descriptor and are passed via the message queue.
+ */
+class EventHandler : public Event, private Monitor
+{
+ public:
+ EventHandler(int epollSize = 256);
+ ~EventHandler();
+
+ int getEpollFd() { return epollFd; }
+ void epollAdd(int fd, uint32_t epollEvents, Event* event);
+ void epollMod(int fd, uint32_t epollEvents, Event* event);
+ void epollDel(int fd);
+
+ void mqPut(Event* event);
+ Event* mqGet();
+
+ protected:
+ // Should never be called, only complete.
+ void prepare(EventHandler&) { assert(0); }
+ Event* complete(EventHandler& eh);
+
+ private:
+ int epollFd;
+ std::string mqName;
+ int mqFd;
+ std::queue<Event*> mqEvents;
+};
+
+EventHandler::EventHandler(int epollSize)
+{
+ epollFd = epoll_create(epollSize);
+ if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+ // Create a POSIX message queue for non-fd events.
+ // We write one byte and never read it is always ready for read
+ // when we add it to epoll.
+ //
+ ZeroStruct<struct mq_attr> attr;
+ attr.mq_maxmsg = 1;
+ attr.mq_msgsize = 1;
+ do {
+ char tmpnam[L_tmpnam];
+ tmpnam_r(tmpnam);
+ mqName = tmpnam + 4; // Skip "tmp/"
+ mqFd = mq_open(
+ mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
+ if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
+ } while (mqFd == EEXIST); // Name already taken, try again.
+
+ static char zero = '\0';
+ mq_send(mqFd, &zero, 1, 0);
+ epollAdd(mqFd, 0, this);
+}
+
+EventHandler::~EventHandler() {
+ mq_close(mqFd);
+ mq_unlink(mqName.c_str());
+}
+
+void EventHandler::mqPut(Event* event) {
+ ScopedLock l(*this);
+ assert(event != 0);
+ mqEvents.push(event);
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+}
+
+Event* EventHandler::mqGet() {
+ ScopedLock l(*this);
+ if (mqEvents.empty())
+ return 0;
+ Event* event = mqEvents.front();
+ mqEvents.pop();
+ if(!mqEvents.empty())
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+ return event;
+}
+
+void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollDel(int fd) {
+ if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+Event* EventHandler::complete(EventHandler& eh)
+{
+ assert(&eh == this);
+ Event* event = mqGet();
+ return event==0 ? 0 : event->complete(eh);
+}
+
+// ================================================================
+// EventChannel
+
+EventChannel::shared_ptr EventChannel::create() {
+ return shared_ptr(new EventChannel());
+}
+
+EventChannel::EventChannel() : handler(new EventHandler()) {}
+
+EventChannel::~EventChannel() {}
+
+void EventChannel::postEvent(Event& e)
+{
+ e.prepare(*handler);
+}
+
+Event* EventChannel::getEvent()
+{
+ static const int infiniteTimeout = -1;
+ ZeroStruct<struct epoll_event> epollEvent;
+
+ // Loop until we can complete the event. Some events may re-post
+ // themselves and return 0 from complete, e.g. partial reads. //
+ Event* event = 0;
+ while (event == 0) {
+ int eventCount = epoll_wait(handler->getEpollFd(),
+ &epollEvent, 1, infiniteTimeout);
+ if (eventCount < 0) {
+ if (errno != EINTR) {
+ // TODO aconway 2006-11-28: Proper handling/logging of errors.
+ cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
+ << PosixError::getMessage(errno) << endl;
+ assert(0);
+ }
+ }
+ else if (eventCount == 1) {
+ event = reinterpret_cast<Event*>(epollEvent.data.ptr);
+ assert(event != 0);
+ try {
+ event = event->complete(*handler);
+ }
+ catch (const Exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ catch (const std::exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ }
+ }
+ return event;
+}
+
+Event::~Event() {}
+
+void Event::prepare(EventHandler& handler)
+{
+ handler.mqPut(this);
+}
+
+bool Event::hasError() const {
+ return error;
+}
+
+void Event::throwIfError() throw (Exception) {
+ if (hasError())
+ error.throwSelf();
+}
+
+Event* Event::complete(EventHandler&)
+{
+ return this;
+}
+
+void Event::dispatch()
+{
+ try {
+ if (!callback.empty())
+ callback();
+ } catch (const std::exception&) {
+ throw;
+ } catch (...) {
+ throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ }
+}
+
+void Event::setError(const ExceptionHolder& e) {
+ error = e;
+}
+
+void ReadEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+ssize_t ReadEvent::doRead() {
+ ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
+ size - received);
+ if (n > 0) received += n;
+ return n;
+}
+
+Event* ReadEvent::complete(EventHandler& handler)
+{
+ // Read as much as possible without blocking.
+ ssize_t n = doRead();
+ while (n > 0 && received < size) doRead();
+
+ if (received == size) {
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ return this;
+ }
+ else if (n <0 && (errno == EAGAIN)) {
+ // Keep polling for more.
+ handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ return 0;
+ }
+ else {
+ // Unexpected EOF or error. Throw ENODATA for EOF.
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ }
+}
+
+void WriteEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+}
+
+Event* WriteEvent::complete(EventHandler& handler)
+{
+ ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
+ size - written);
+ if (n < 0) throw QPID_POSIX_ERROR(errno);
+ written += n;
+ if(written < size) {
+ // Keep polling.
+ handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+ return 0;
+ }
+ written = 0; // Reset for re-use.
+ handler.epollDel(descriptor);
+ return this;
+}
+
+void AcceptEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+Event* AcceptEvent::complete(EventHandler& handler)
+{
+ handler.epollDel(descriptor);
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) throw QPID_POSIX_ERROR(errno);
+ return this;
+}
+
+}}
diff --git a/cpp/src/qpid/posix/EventChannel.h b/cpp/src/qpid/posix/EventChannel.h
new file mode 100644
index 0000000000..55fd2ad135
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannel.h
@@ -0,0 +1,176 @@
+#ifndef _sys_EventChannel_h
+#define _sys_EventChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/SharedObject.h>
+#include <qpid/ExceptionHolder.h>
+#include <boost/function.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class Event;
+class EventHandler;
+class EventChannel;
+
+/**
+ * Base class for all Events.
+ */
+class Event
+{
+ public:
+ /** Type for callback when event is dispatched */
+ typedef boost::function0<void> Callback;
+
+ /**
+ * Create an event with optional callback.
+ * Instances of Event are sent directly through the channel.
+ * Derived classes define additional waiting behaviour.
+ *@param cb A callback functor that is invoked when dispatch() is called.
+ */
+ Event(Callback cb = 0) : callback(cb) {}
+
+ virtual ~Event();
+
+ /** Call the callback provided to the constructor, if any. */
+ void dispatch();
+
+ /** True if there was an error processing this event */
+ bool hasError() const;
+
+ /** If hasError() throw the corresponding exception. */
+ void throwIfError() throw(Exception);
+
+ protected:
+ virtual void prepare(EventHandler&);
+ virtual Event* complete(EventHandler&);
+ void setError(const ExceptionHolder& e);
+
+ Callback callback;
+ ExceptionHolder error;
+
+ friend class EventChannel;
+ friend class EventHandler;
+};
+
+template <class BufT>
+class IOEvent : public Event {
+ public:
+ void getDescriptor() const { return descriptor; }
+ size_t getSize() const { return size; }
+ BufT getBuffer() const { return buffer; }
+
+ protected:
+ IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
+ Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+
+ int descriptor;
+ BufT buffer;
+ size_t size;
+};
+
+/** Asynchronous read event */
+class ReadEvent : public IOEvent<void*>
+{
+ public:
+ explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
+ IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+ ssize_t doRead();
+
+ size_t received;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public IOEvent<const void*>
+{
+ public:
+ explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
+ Callback cb=0) :
+ IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+
+ protected:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ private:
+ ssize_t doWrite();
+ size_t written;
+};
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public Event
+{
+ public:
+ /** Accept a connection on fd. */
+ explicit AcceptEvent(int fd=-1, Callback cb=0) :
+ Event(cb), descriptor(fd), accepted(0) {}
+
+ /** Get descriptor for server socket */
+ int getAcceptedDesscriptor() const { return accepted; }
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ int descriptor;
+ int accepted;
+};
+
+
+class QueueSet;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void postEvent(Event& event);
+
+ /** Post an event to the channel. Must not be 0. */
+ void postEvent(Event* event) { postEvent(*event); }
+
+ /**
+ * Wait for the next complete event.
+ *@return Pointer to event. Will never return 0.
+ */
+ Event* getEvent();
+
+ private:
+ EventChannel();
+ boost::shared_ptr<EventHandler> handler;
+};
+
+
+}}
+
+
+
+#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/src/qpid/posix/EventChannelThreads.cpp b/cpp/src/qpid/posix/EventChannelThreads.cpp
new file mode 100644
index 0000000000..f97efd17e8
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannelThreads.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "EventChannelThreads.h"
+#include <qpid/sys/Runnable.h>
+#include <iostream>
+using namespace std;
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+EventChannelThreads::shared_ptr EventChannelThreads::create(
+ EventChannel::shared_ptr ec)
+{
+ return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+}
+
+EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
+ channel(ec), nWaiting(0), state(RUNNING)
+{
+ // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
+ addThread();
+}
+
+EventChannelThreads::~EventChannelThreads() {
+ shutdown();
+ join();
+}
+
+void EventChannelThreads::shutdown()
+{
+ ScopedLock lock(*this);
+ if (state != RUNNING) // Already shutting down.
+ return;
+ for (size_t i = 0; i < workers.size(); ++i) {
+ channel->postEvent(terminate);
+ }
+ state = TERMINATE_SENT;
+ notify(); // Wake up one join() thread.
+}
+
+void EventChannelThreads::join()
+{
+ {
+ ScopedLock lock(*this);
+ while (state == RUNNING) // Wait for shutdown to start.
+ wait();
+ if (state == SHUTDOWN) // Shutdown is complete
+ return;
+ if (state == JOINING) {
+ // Someone else is doing the join.
+ while (state != SHUTDOWN)
+ wait();
+ return;
+ }
+ // I'm the joining thread
+ assert(state == TERMINATE_SENT);
+ state = JOINING;
+ } // Drop the lock.
+
+ for (size_t i = 0; i < workers.size(); ++i) {
+ assert(state == JOINING); // Only this thread can change JOINING.
+ workers[i].join();
+ }
+ state = SHUTDOWN;
+ notifyAll(); // Notify other join() threaeds.
+}
+
+void EventChannelThreads::addThread() {
+ ScopedLock l(*this);
+ workers.push_back(Thread(*this));
+}
+
+void EventChannelThreads::run()
+{
+ // Start life waiting. Decrement on exit.
+ AtomicCount::ScopedIncrement inc(nWaiting);
+ try {
+ while (true) {
+ Event* e = channel->getEvent();
+ assert(e != 0);
+ if (e == &terminate) {
+ return;
+ }
+ AtomicCount::ScopedDecrement dec(nWaiting);
+ // I'm no longer waiting, make sure someone is.
+ if (dec == 0)
+ addThread();
+ e->dispatch();
+ }
+ }
+ catch (const std::exception& e) {
+ // TODO aconway 2006-11-15: need better logging across the board.
+ std::cerr << "EventChannelThreads::run() caught: " << e.what()
+ << std::endl;
+ }
+ catch (...) {
+ std::cerr << "EventChannelThreads::run() caught unknown exception."
+ << std::endl;
+ }
+}
+
+}}
diff --git a/cpp/src/qpid/posix/EventChannelThreads.h b/cpp/src/qpid/posix/EventChannelThreads.h
new file mode 100644
index 0000000000..ae172ae752
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannelThreads.h
@@ -0,0 +1,92 @@
+#ifndef _posix_EventChannelThreads_h
+#define _sys_EventChannelThreads_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+
+#include <qpid/Exception.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Monitor.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/AtomicCount.h>
+#include "EventChannel.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ Dynamic thread pool serving an EventChannel.
+
+ Threads run a loop { e = getEvent(); e->dispatch(); }
+ The size of the thread pool is automatically adjusted to optimal size.
+*/
+class EventChannelThreads :
+ public qpid::SharedObject<EventChannelThreads>,
+ public sys::Monitor, private sys::Runnable
+{
+ public:
+ /** Create the thread pool and start initial threads. */
+ static EventChannelThreads::shared_ptr create(
+ EventChannel::shared_ptr channel
+ );
+
+ ~EventChannelThreads();
+
+ /** Post event to the underlying channel */
+ void postEvent(Event& event) { channel->postEvent(event); }
+
+ /** Post event to the underlying channel Must not be 0. */
+ void postEvent(Event* event) { channel->postEvent(event); }
+
+ /**
+ * Terminate all threads.
+ *
+ * Returns immediately, use join() to wait till all threads are
+ * shut down.
+ */
+ void shutdown();
+
+ /** Wait for all threads to terminate. */
+ void join();
+
+ private:
+ typedef std::vector<sys::Thread> Threads;
+ typedef enum {
+ RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ } State;
+
+ EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ void addThread();
+
+ void run();
+ bool keepRunning();
+ void adjustThreads();
+
+ EventChannel::shared_ptr channel;
+ Threads workers;
+ sys::AtomicCount nWaiting;
+ State state;
+ Event terminate;
+};
+
+
+}}
+
+
+#endif /*!_sys_EventChannelThreads_h*/
diff --git a/cpp/src/qpid/posix/Socket.cpp b/cpp/src/qpid/posix/Socket.cpp
index 1321ae6b0d..6d47c64b32 100644
--- a/cpp/src/qpid/posix/Socket.cpp
+++ b/cpp/src/qpid/posix/Socket.cpp
@@ -20,6 +20,7 @@
*/
#include <sys/socket.h>
+#include <sys/errno.h>
#include <netinet/in.h>
#include <netdb.h>
@@ -31,60 +32,87 @@
using namespace qpid::sys;
-Socket::Socket() : socket(::socket (PF_INET, SOCK_STREAM, 0))
+Socket Socket::createTcp()
{
- CHECKNN(socket == 0);
+ int s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return s;
}
-void
-Socket::setTimeout(long msecs)
+Socket::Socket(int descriptor) : socket(descriptor) {}
+
+void Socket::setTimeout(Time interval)
{
struct timeval tv;
- tv.tv_sec = msecs / 1000;
- tv.tv_usec = (msecs % 1000)*1000;
+ tv.tv_sec = interval/TIME_SEC;
+ tv.tv_usec = (interval%TIME_SEC)/TIME_USEC;
setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
}
-void
-Socket::connect(const std::string& host, int port)
+void Socket::connect(const std::string& host, int port)
{
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
struct hostent* hp = gethostbyname ( host.c_str() );
- if (hp == 0) CHECK0(-1); // TODO aconway 2006-11-09: error message?
+ if (hp == 0) throw QPID_POSIX_ERROR(errno);
memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
- CHECK0(::connect(socket, (struct sockaddr*)(&name), sizeof(name)));
+ if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
}
void
Socket::close()
{
if (socket == 0) return;
- CHECK0(::close(socket));
+ if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
socket = 0;
}
ssize_t
-Socket::send(const char* data, size_t size)
+Socket::send(const void* data, size_t size)
{
ssize_t sent = ::send(socket, data, size, 0);
if (sent < 0) {
if (errno == ECONNRESET) return SOCKET_EOF;
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- CHECK0(sent);
+ throw QPID_POSIX_ERROR(errno);
}
return sent;
}
ssize_t
-Socket::recv(char* data, size_t size)
+Socket::recv(void* data, size_t size)
{
ssize_t received = ::recv(socket, data, size, 0);
if (received < 0) {
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- CHECK0(received);
+ throw QPID_POSIX_ERROR(errno);
}
return received;
}
+
+int Socket::listen(int port, int backlog)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+ if (::listen(socket, backlog) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ socklen_t namelen = sizeof(name);
+ if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ return ntohs(name.sin_port);
+}
+
+
+int Socket::fd()
+{
+ return socket;
+}
diff --git a/cpp/src/qpid/posix/check.cpp b/cpp/src/qpid/posix/check.cpp
index 2ef52f68b7..408679caa8 100644
--- a/cpp/src/qpid/posix/check.cpp
+++ b/cpp/src/qpid/posix/check.cpp
@@ -19,15 +19,21 @@
*
*/
-#include <qpid/QpidError.h>
+#include <cerrno>
#include "check.h"
namespace qpid {
namespace sys {
-std::string errnoToString() {
+std::string
+PosixError::getMessage(int errNo)
+{
char buf[512];
- return strerror_r(errno, buf, sizeof(buf));
+ return std::string(strerror_r(errNo, buf, sizeof(buf)));
}
+PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
+ : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+{ }
+
}}
diff --git a/cpp/src/qpid/posix/check.h b/cpp/src/qpid/posix/check.h
index 666637b1c2..2d3a8d30e3 100644
--- a/cpp/src/qpid/posix/check.h
+++ b/cpp/src/qpid/posix/check.h
@@ -22,18 +22,41 @@
*
*/
-#include <errno.h>
+#include <cerrno>
#include <string>
#include <qpid/QpidError.h>
namespace qpid {
namespace sys {
-std::string errnoToString();
+/**
+ * Exception with message from errno.
+ */
+class PosixError : public qpid::QpidError
+{
+ public:
+ static std::string getMessage(int errNo);
+
+ PosixError(int errNo, const qpid::SrcLine& location) throw();
+
+ ~PosixError() throw() {}
+
+ int getErrNo() { return errNo; }
+
+ Exception* clone() const throw() { return new PosixError(*this); }
+
+ void throwSelf() { throw *this; }
+
+ private:
+ int errNo;
+};
-#define CHECK0(N) if ((N)!=0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString())
-#define CHECKNN(N) if ((N)<0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString())
}}
+/** Create a PosixError for the current file/line and errno. */
+#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+/** Throw a posix error if errNo is non-zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
#endif /*!_posix_check_h*/
diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h
new file mode 100644
index 0000000000..b625b2c9b0
--- /dev/null
+++ b/cpp/src/qpid/sys/AtomicCount.h
@@ -0,0 +1,71 @@
+#ifndef _posix_AtomicCount_h
+#define _posix_AtomicCount_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/detail/atomic_count.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Atomic counter.
+ */
+class AtomicCount : boost::noncopyable {
+ public:
+ class ScopedDecrement : boost::noncopyable {
+ public:
+ /** Decrement counter in constructor and increment in destructor. */
+ ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+ /** Return the value returned by the decrement. */
+ operator long() { return value; }
+ private:
+ AtomicCount& count;
+ long value;
+ };
+
+ class ScopedIncrement : boost::noncopyable {
+ public:
+ /** Increment counter in constructor and increment in destructor. */
+ ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ AtomicCount& count;
+ };
+
+ AtomicCount(long value = 0) : count(value) {}
+
+ void operator++() { ++count ; }
+
+ long operator--() { return --count; }
+
+ operator long() const { return count; }
+
+
+ private:
+ boost::detail::atomic_count count;
+};
+
+
+}}
+
+
+#endif // _posix_AtomicCount_h
diff --git a/cpp/src/qpid/sys/EventChannel.h b/cpp/src/qpid/sys/EventChannel.h
deleted file mode 100644
index dd857c02c7..0000000000
--- a/cpp/src/qpid/sys/EventChannel.h
+++ /dev/null
@@ -1,239 +0,0 @@
-#ifndef _sys_EventChannel_h
-#define _sys_EventChannel_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/SharedObject.h>
-#include <qpid/Exception.h>
-#include <qpid/sys/Time.h>
-
-namespace qpid {
-namespace sys {
-
-class EventChannel;
-
-class Event;
-class ReadEvent;
-class WriteEvent;
-class AcceptEvent;
-class NotifyEvent;
-
-/**
- Active event channel. Events represent async IO requests or
- inter-task synchronization. Posting an Event registers interest in
- the IO or sync event. When it occurs the posted Event is
- corresponding IO or sync event occurs they are returned to one
- of the threads waiting on the channel. For more details see
- the Event hierarchy.
-*/
-class EventChannel : public qpid::SharedObject<EventChannel>
-{
- public:
- static EventChannel::shared_ptr create();
-
- virtual ~EventChannel() {}
-
- virtual void post(ReadEvent& event) = 0;
- virtual void post(WriteEvent& event) = 0;
- virtual void post(AcceptEvent& event) = 0;
- virtual void post(NotifyEvent& event) = 0;
-
- inline void post(Event& event);
-
- /**
- * Wait for the next completed event.
- * @return An Event or 0 to indicate the calling thread should shut down.
- */
- virtual Event* getEvent() = 0;
-
- /** Dispose of a system-allocated buffer. Called by ReadEvent */
- virtual void dispose(void* buffer, size_t size) = 0;
-
- protected:
- EventChannel() {}
-};
-
-
-/**
- * Base class for all events. There are two possible styles of use:
- *
- * Task style: the event is allocated as a local variable on the initiating
- * task, which blocks in wait(). Event::dispatch() resumes that task
- * with the event data available.
- *
- * Proactor style: Threads post events but do not
- * wait. Event::dispatch() processes the event in the dispatching
- * thread and then deletes itself.
- *
- * Tasks give less kernel context switching and blocking AND simpler
- * coding. Tasks can call any number of pseudo-blocking opereations
- * that are actually event post/wait pairs. At each such point the
- * current thread can continue with the task or switch to another task
- * to minimise blocking.
- *
- * With Proactor style dispatch() is an atomic unit of work as far as
- * the EventChannel is concerned. To avoid such blocking the
- * application has to be written as a collection of non-blocking
- * dispatch() callbacks, which is more complex than tasks that can
- * call pseudo-blocking operations.
- */
-class Event : private boost::noncopyable
-{
- public:
- virtual ~Event() {}
-
- /** Post this event to the channel */
- virtual void post(EventChannel& channel) = 0;
-
- /**
- * Block till the event is delivered.
- * At most one task can wait on an event.
- */
- virtual void wait() const = 0;
-
- /**
- * Dispatch the event. Runs some event-specific code, may switch
- * context to resume a waiting task.
- */
- virtual void dispatch() = 0;
-};
-
-
-/**
- * Base class for asynchronous request events, provides exception
- * handling.
- */
-class RequestEvent : public Event
-{
- public:
- /** True if the async request failed */
- bool hasException() const { return ex.get(); }
-
- const qpid::Exception& getException() const { return *ex; }
-
- void setException(std::auto_ptr<qpid::Exception>& e) { ex = e; }
-
- /** If the event has an exception throw it, else do nothing */
- void verify() const { if (ex.get()) throw *ex; }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- qpid::HeapException ex;
-};
-
-
-/** An asynchronous read event. */
-class ReadEvent : public RequestEvent {
- public:
- /**
- * Read data from fd.
- */
- ReadEvent(int fileDescriptor, void* buffer, size_t bytesToRead) :
- fd(fileDescriptor), data(buffer), size(bytesToRead) {}
-
- /** Number of bytes read. */
- size_t getBytesRead() const { verify(); return size; }
-
- /**
- * If the system supports direct access to DMA buffers then
- * it may provide a direct pointer to such a buffer to avoid
- * a copy into the user buffer.
- * @return true if getData() is returning a system-supplied buffer.
- */
- bool isSystemData() const { verify(); return channel != 0; }
-
- /**
- * Pointer to data read. Note if isSystemData() is true then this
- * is NOT the same buffer that was supplied to the constructor.
- * The user buffer is untouched. See dispose().
- */
- void* getData() const { verify(); return data; }
-
- /** Called by the event channel on completion. */
- void complete(EventChannel::shared_ptr ec, void* _data, size_t _size) {
- if (data != _data) channel = ec; data = _data; size = _size;
- }
-
- /**
- * Dispose of system-provided data buffer, if any. This is
- * automatically called by the destructor.
- */
- void dispose() { if(channel && data) channel->dispose(data,size); data=0; }
-
- ~ReadEvent() { dispose(); }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- int fd;
- void* data;
- size_t size;
- EventChannel::shared_ptr channel;
-};
-
-/** Asynchronous write event */
-class WriteEvent : public RequestEvent {
- public:
- WriteEvent(int fileDescriptor, void* buffer, size_t bytesToWrite) :
- fd(fileDescriptor), data(buffer), size(bytesToWrite) {}
-
- /** Number of bytes written */
- size_t getBytesWritten() const { verify(); return size; }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- int fd;
- void* data;
- size_t size;
-};
-
-/** Asynchronous socket accept event */
-class AcceptEvent : public RequestEvent {
- public:
- /** Accept a connection on listeningFd */
- AcceptEvent(int listeningFd) : listen(listeningFd) {}
-
- /** Get accepted file descriptor */
- int getAcceptedFd() const { verify(); return accepted; }
-
- void post(EventChannel& channel) { channel.post(*this); }
-
- private:
- int listen;
- int accepted;
-};
-
-/**
- * NotifyEvent is delievered immediately to be dispatched by an
- * EventChannel thread.
- */
-class NotifyEvent : public RequestEvent {
- public:
- void post(EventChannel& channel) { channel.post(*this); }
-};
-
-
-inline void EventChannel::post(Event& event) { event.post(*this); }
-
-}}
-
-
-#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h
index 59e1e74b57..bbe126cecb 100644
--- a/cpp/src/qpid/sys/Monitor.h
+++ b/cpp/src/qpid/sys/Monitor.h
@@ -22,60 +22,28 @@
*
*/
+#include <sys/errno.h>
#include <boost/noncopyable.hpp>
+#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
#ifdef USE_APR
-# include <apr_thread_mutex.h>
-# include <apr_thread_cond.h>
-# include <qpid/apr/APRBase.h>
-# include <qpid/apr/APRPool.h>
-#else
-# include <pthread.h>
-# include <qpid/sys/Time.h>
-# include <qpid/posix/check.h>
+# include <apr-1/apr_thread_cond.h>
#endif
namespace qpid {
namespace sys {
-template <class L>
-class ScopedLock
-{
- public:
- ScopedLock(L& l) : mutex(l) { l.lock(); }
- ~ScopedLock() { mutex.unlock(); }
- private:
- L& mutex;
-};
-
-
-class Mutex : private boost::noncopyable
-{
- public:
- typedef ScopedLock<Mutex> ScopedLock;
-
- inline Mutex();
- inline ~Mutex();
- inline void lock();
- inline void unlock();
- inline void trylock();
-
- protected:
-#ifdef USE_APR
- apr_thread_mutex_t* mutex;
-#else
- pthread_mutex_t mutex;
-#endif
-};
-
-/** A condition variable and a mutex */
+/**
+ * A monitor is a condition variable and a mutex
+ */
class Monitor : public Mutex
{
public:
inline Monitor();
inline ~Monitor();
inline void wait();
- inline bool wait(int64_t nsecs);
+ inline bool wait(const Time& absoluteTime);
inline void notify();
inline void notifyAll();
@@ -91,25 +59,6 @@ class Monitor : public Mutex
// APR ================================================================
#ifdef USE_APR
-Mutex::Mutex() {
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
-}
-
-Mutex::~Mutex(){
- CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
-}
-
-void Mutex::lock() {
- CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
-}
-void Mutex::unlock() {
- CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
-}
-
-void Mutex::trylock() {
- CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
-}
-
Monitor::Monitor() {
CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
}
@@ -122,10 +71,10 @@ void Monitor::wait() {
CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
}
-bool Monitor::wait(int64_t nsecs){
+bool Monitor::wait(const Time& absoluteTime){
// APR uses microseconds.
- apr_status_t status = apr_thread_cond_timedwait(
- condition, mutex, nsecs/1000);
+ apr_status_t status =
+ apr_thread_cond_timedwait(condition, mutex, absoluteTime/TIME_USEC);
if(status != APR_TIMEUP) CHECK_APR_SUCCESS(status);
return status == 0;
}
@@ -138,93 +87,41 @@ void Monitor::notifyAll(){
CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
}
-
-}}
-
-
-// POSIX ================================================================
#else
-/**
- * PODMutex is a POD, can be static-initialized with
- * PODMutex m = QPID_PODMUTEX_INITIALIZER
- */
-struct PODMutex
-{
- typedef ScopedLock<PODMutex> ScopedLock;
-
- inline void lock();
- inline void unlock();
- inline void trylock();
-
- // Must be public to be a POD:
- pthread_mutex_t mutex;
-};
-
-#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
-
-
-void PODMutex::lock() {
- CHECK0(pthread_mutex_lock(&mutex));
-}
-void PODMutex::unlock() {
- CHECK0(pthread_mutex_unlock(&mutex));
-}
-
-void PODMutex::trylock() {
- CHECK0(pthread_mutex_trylock(&mutex));
-}
-
-
-Mutex::Mutex() {
- CHECK0(pthread_mutex_init(&mutex, 0));
-}
-
-Mutex::~Mutex(){
- CHECK0(pthread_mutex_destroy(&mutex));
-}
-
-void Mutex::lock() {
- CHECK0(pthread_mutex_lock(&mutex));
-}
-void Mutex::unlock() {
- CHECK0(pthread_mutex_unlock(&mutex));
-}
-
-void Mutex::trylock() {
- CHECK0(pthread_mutex_trylock(&mutex));
-}
+// POSIX ================================================================
Monitor::Monitor() {
- CHECK0(pthread_cond_init(&condition, 0));
+ QPID_POSIX_THROW_IF(pthread_cond_init(&condition, 0));
}
Monitor::~Monitor() {
- CHECK0(pthread_cond_destroy(&condition));
+ QPID_POSIX_THROW_IF(pthread_cond_destroy(&condition));
}
void Monitor::wait() {
- CHECK0(pthread_cond_wait(&condition, &mutex));
+ QPID_POSIX_THROW_IF(pthread_cond_wait(&condition, &mutex));
}
-bool Monitor::wait(int64_t nsecs){
- Time t(nsecs);
- int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec());
- if(status != 0) {
- if (errno == ETIMEDOUT) return false;
- CHECK0(status);
+bool Monitor::wait(const Time& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
}
return true;
}
void Monitor::notify(){
- CHECK0(pthread_cond_signal(&condition));
+ QPID_POSIX_THROW_IF(pthread_cond_signal(&condition));
}
void Monitor::notifyAll(){
- CHECK0(pthread_cond_broadcast(&condition));
+ QPID_POSIX_THROW_IF(pthread_cond_broadcast(&condition));
}
+#endif /*USE_APR*/
}}
-#endif /*USE_APR*/
#endif /*!_sys_Monitor_h*/
diff --git a/cpp/src/qpid/sys/Mutex.h b/cpp/src/qpid/sys/Mutex.h
new file mode 100644
index 0000000000..3ada2e98b7
--- /dev/null
+++ b/cpp/src/qpid/sys/Mutex.h
@@ -0,0 +1,151 @@
+#ifndef _sys_Mutex_h
+#define _sys_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifdef USE_APR
+# include <apr-1/apr_thread_mutex.h>
+# include <qpid/apr/APRBase.h>
+# include <qpid/apr/APRPool.h>
+#else
+# include <pthread.h>
+# include <qpid/posix/check.h>
+#endif
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Scoped lock template: calls lock() in ctor, unlock() in dtor.
+ * L can be any class with lock() and unlock() functions.
+ */
+template <class L>
+class ScopedLock
+{
+ public:
+ ScopedLock(L& l) : mutex(l) { l.lock(); }
+ ~ScopedLock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ public:
+ typedef ScopedLock<Mutex> ScopedLock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ protected:
+#ifdef USE_APR
+ apr_thread_mutex_t* mutex;
+#else
+ pthread_mutex_t mutex;
+#endif
+};
+
+#ifdef USE_APR
+// APR ================================================================
+
+Mutex::Mutex() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+#else
+// POSIX ================================================================
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
+
+ // Must be public to be a POD:
+ pthread_mutex_t mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
+
+
+void PODMutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void PODMutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void PODMutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+
+Mutex::Mutex() {
+ QPID_POSIX_THROW_IF(pthread_mutex_init(&mutex, 0));
+}
+
+Mutex::~Mutex(){
+ QPID_POSIX_THROW_IF(pthread_mutex_destroy(&mutex));
+}
+
+void Mutex::lock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
+}
+void Mutex::unlock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+void Mutex::trylock() {
+ QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
+}
+
+#endif // USE_APR
+
+}}
+
+
+
+#endif /*!_sys_Mutex_h*/
diff --git a/cpp/src/qpid/posix/EpollEventChannel.h b/cpp/src/qpid/sys/Runnable.cpp
index 8128d5276f..30122c682f 100644
--- a/cpp/src/qpid/posix/EpollEventChannel.h
+++ b/cpp/src/qpid/sys/Runnable.cpp
@@ -16,33 +16,17 @@
*
*/
-#include <qpid/sys/EventChannel.h>
+#include "Runnable.h"
+#include <boost/bind.hpp>
namespace qpid {
namespace sys {
-/** Epoll-based implementation of the event channel */
-class EpollEventChannel : public EventChannel
-{
- public:
-
- EpollEventChannel();
- ~EpollEventChannel();
-
- virtual void post(ReadEvent& event);
- virtual void post(WriteEvent& event);
- virtual void post(AcceptEvent& event);
- virtual void post(NotifyEvent& event);
-
- inline void post(Event& event);
+Runnable::~Runnable() {}
- virtual Event* getEvent();
-
- virtual void dispose(void* buffer, size_t size);
-
- private:
- int epollFd;
-
-};
+Runnable::Functor Runnable::functor()
+{
+ return boost::bind(&Runnable::run, this);
+}
}}
diff --git a/cpp/src/qpid/sys/Runnable.h b/cpp/src/qpid/sys/Runnable.h
index 8379afb2f9..fb3927c612 100644
--- a/cpp/src/qpid/sys/Runnable.h
+++ b/cpp/src/qpid/sys/Runnable.h
@@ -1,3 +1,5 @@
+#ifndef _Runnable_
+#define _Runnable_
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,18 +20,28 @@
* under the License.
*
*/
-#ifndef _Runnable_
-#define _Runnable_
+
+#include <boost/function.hpp>
namespace qpid {
namespace sys {
-/** Base class for classes that run in a thread. */
+/**
+ * Interface for objects that can be run, e.g. in a thread.
+ */
class Runnable
{
public:
- virtual ~Runnable() {}
+ /** Type to represent a runnable as a Functor */
+ typedef boost::function0<void> Functor;
+
+ virtual ~Runnable();
+
+ /** Derived classes override run(). */
virtual void run() = 0;
+
+ /** Create a functor object that will call this->run(). */
+ Functor functor();
};
}}
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index cf757e7a27..d3e8c1af48 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -23,6 +23,7 @@
*/
#include <string>
+#include <qpid/sys/Time.h>
#ifdef USE_APR
# include <apr_network_io.h>
@@ -34,10 +35,18 @@ namespace sys {
class Socket
{
public:
- Socket();
+ /** Create an initialized TCP socket */
+ static Socket createTcp();
+ /** Create a socket wrapper for descriptor. */
+#ifdef USE_APR
+ Socket(apr_socket_t* descriptor = 0);
+#else
+ Socket(int descriptor = 0);
+#endif
+
/** Set timeout for read and write */
- void setTimeout(long msecs);
+ void setTimeout(Time interval);
void connect(const std::string& host, int port);
@@ -46,19 +55,30 @@ class Socket
enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
/** Returns bytes sent or an ErrorCode value < 0. */
- ssize_t send(const char* data, size_t size);
+ ssize_t send(const void* data, size_t size);
/**
* Returns bytes received, an ErrorCode value < 0 or 0
* if the connection closed in an orderly manner.
*/
- ssize_t recv(char* data, size_t size);
+ ssize_t recv(void* data, size_t size);
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10);
+ /** Get file descriptor */
+ int fd();
+
private:
#ifdef USE_APR
apr_socket_t* socket;
#else
- int socket;
+ void init() const;
+ mutable int socket; // Initialized on demand.
#endif
};
diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h
index 2aad7c24d7..37f714dd6c 100644
--- a/cpp/src/qpid/sys/Thread.h
+++ b/cpp/src/qpid/sys/Thread.h
@@ -40,11 +40,17 @@ namespace sys {
class Thread
{
public:
+ inline static Thread current();
+ inline static void yield();
+
inline Thread();
inline explicit Thread(qpid::sys::Runnable*);
+ inline explicit Thread(qpid::sys::Runnable&);
+
inline void join();
- inline static Thread current();
+ inline long id();
+
private:
#ifdef USE_APR
static void* APR_THREAD_FUNC runRunnable(apr_thread_t* thread, void *data);
@@ -68,12 +74,21 @@ Thread::Thread(Runnable* runnable) {
apr_thread_create(&thread, 0, runRunnable, runnable, APRPool::get()));
}
+Thread::Thread(Runnable& runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, 0, runRunnable, &runnable, APRPool::get()));
+}
+
void Thread::join(){
apr_status_t status;
if (thread != 0)
CHECK_APR_SUCCESS(apr_thread_join(&status, thread));
}
+long Thread::id() {
+ return long(thread);
+}
+
Thread::Thread(apr_thread_t* t) : thread(t) {}
Thread Thread::current(){
@@ -83,15 +98,29 @@ Thread Thread::current(){
return Thread(thr);
}
+void Thread::yield()
+{
+ apr_thread_yield();
+}
+
+
// POSIX ================================================================
#else
Thread::Thread(Runnable* runnable) {
- CHECK0(pthread_create(&thread, NULL, runRunnable, runnable));
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, runnable));
+}
+
+Thread::Thread(Runnable& runnable) {
+ QPID_POSIX_THROW_IF(pthread_create(&thread, NULL, runRunnable, &runnable));
}
void Thread::join(){
- if (thread != 0) CHECK0(pthread_join(thread, 0));
+ QPID_POSIX_THROW_IF(pthread_join(thread, 0));
+}
+
+long Thread::id() {
+ return long(thread);
}
Thread::Thread(pthread_t thr) : thread(thr) {}
@@ -99,6 +128,13 @@ Thread::Thread(pthread_t thr) : thread(thr) {}
Thread Thread::current() {
return Thread(pthread_self());
}
+
+void Thread::yield()
+{
+ QPID_POSIX_THROW_IF(pthread_yield());
+}
+
+
#endif
}}
diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp
index 3971297ec2..ad6185b966 100644
--- a/cpp/src/qpid/sys/Time.cpp
+++ b/cpp/src/qpid/sys/Time.cpp
@@ -27,37 +27,34 @@ namespace sys {
// APR ================================================================
#if USE_APR
-Time Time::now() {
- return Time(apr_time_now(), NSEC_PER_USEC);
-}
-
-void Time::set(int64_t ticks, long nsec_per_tick) {
- time = (ticks * nsec_per_tick) / NSEC_PER_USEC;
-}
-
-int64_t Time::nsecs() const {
- return time * NSEC_PER_USEC;
-}
+Time now() { return apr_time_now() * TIME_USEC; }
// POSIX================================================================
#else
-Time Time::now() {
- Time t;
- clock_gettime(CLOCK_REALTIME, &t.time);
- return t;
+Time now() {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ return toTime(ts);
+}
+
+struct timespec toTimespec(const Time& t) {
+ struct timespec ts;
+ toTimespec(ts, t);
+ return ts;
}
-void Time::set(int64_t ticks, long nsec_per_tick) {
- int64_t ns = ticks * nsec_per_tick;
- time.tv_sec = ns / NSEC_PER_SEC;
- time.tv_nsec = ns % NSEC_PER_SEC;
+struct timespec& toTimespec(struct timespec& ts, const Time& t) {
+ ts.tv_sec = t / TIME_SEC;
+ ts.tv_nsec = t % TIME_SEC;
+ return ts;
}
-int64_t Time::nsecs() const {
- return time.tv_sec * NSEC_PER_SEC + time.tv_nsec;
+Time toTime(const struct timespec& ts) {
+ return ts.tv_sec*TIME_SEC + ts.tv_nsec;
}
+
#endif
}}
diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h
index a569c90780..3dd46741d8 100644
--- a/cpp/src/qpid/sys/Time.h
+++ b/cpp/src/qpid/sys/Time.h
@@ -33,43 +33,25 @@
namespace qpid {
namespace sys {
-class Time
-{
- public:
- static Time now();
-
- enum {
- NSEC_PER_SEC=1000*1000*1000,
- NSEC_PER_MSEC=1000*1000,
- NSEC_PER_USEC=1000
- };
+/** Time in nanoseconds */
+typedef int64_t Time;
- inline Time(int64_t ticks=0, long nsec_per_tick=1);
+Time now();
- void set(int64_t ticks, long nsec_per_tick=1);
-
- inline int64_t msecs() const;
- inline int64_t usecs() const;
- int64_t nsecs() const;
+/** Nanoseconds per second. */
+const Time TIME_SEC = 1000*1000*1000;
+/** Nanoseconds per millisecond */
+const Time TIME_MSEC = 1000*1000;
+/** Nanoseconds per microseconds. */
+const Time TIME_USEC = 1000;
+/** Nanoseconds per nanosecond. */
+const Time TIME_NSEC = 1;
#ifndef USE_APR
- const struct timespec& getTimespec() const { return time; }
- struct timespec& getTimespec() { return time; }
+struct timespec toTimespec(const Time& t);
+struct timespec& toTimespec(struct timespec& ts, const Time& t);
+Time toTime(const struct timespec& ts);
#endif
-
- private:
-#ifdef USE_APR
- apr_time_t time;
-#else
- struct timespec time;
-#endif
-};
-
-Time::Time(int64_t ticks, long nsec_per_tick) { set(ticks, nsec_per_tick); }
-
-int64_t Time::msecs() const { return nsecs() / NSEC_PER_MSEC; }
-
-int64_t Time::usecs() const { return nsecs() / NSEC_PER_USEC; }
}}
diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp
index 18b162ec8a..0a3c300f4a 100644
--- a/cpp/test/client/client_test.cpp
+++ b/cpp/test/client/client_test.cpp
@@ -90,7 +90,9 @@ int main(int argc, char**)
con.close();
std::cout << "Closed connection." << std::endl;
}catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << "Error [" << error.code << "] " << error.msg << " ("
+ << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
return 1;
}
return 0;
diff --git a/cpp/test/client/echo_service.cpp b/cpp/test/client/echo_service.cpp
index f0aa49fd4b..3df3da0b86 100644
--- a/cpp/test/client/echo_service.cpp
+++ b/cpp/test/client/echo_service.cpp
@@ -107,7 +107,7 @@ int main(int argc, char** argv){
connection.close();
} catch(qpid::QpidError error) {
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
} else {
try {
@@ -133,7 +133,7 @@ int main(int argc, char** argv){
connection.close();
} catch(qpid::QpidError error) {
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
}
}
diff --git a/cpp/test/client/topic_listener.cpp b/cpp/test/client/topic_listener.cpp
index 413d482361..bd7cfdc62c 100644
--- a/cpp/test/client/topic_listener.cpp
+++ b/cpp/test/client/topic_listener.cpp
@@ -38,7 +38,7 @@ class Listener : public MessageListener{
const bool transactional;
bool init;
int count;
- int64_t start;
+ Time start;
void shutdown();
void report();
@@ -96,7 +96,7 @@ int main(int argc, char** argv){
channel.run();
connection.close();
}catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
}
}
@@ -106,7 +106,7 @@ Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :
void Listener::received(Message& message){
if(!init){
- start = Time::now().msecs();
+ start = now();
count = 0;
init = true;
}
@@ -128,10 +128,11 @@ void Listener::shutdown(){
}
void Listener::report(){
- int64_t finish = Time::now().msecs();
- int64_t time = finish - start;
+ Time finish = now();
+ Time time = finish - start;
std::stringstream reportstr;
- reportstr << "Received " << count << " messages in " << time << " ms.";
+ reportstr << "Received " << count << " messages in "
+ << time/TIME_MSEC << " ms.";
Message msg;
msg.setData(reportstr.str());
channel->publish(msg, string(), responseQueue);
diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp
index d9f271e2f0..97d589c1d1 100644
--- a/cpp/test/client/topic_publisher.cpp
+++ b/cpp/test/client/topic_publisher.cpp
@@ -114,11 +114,13 @@ int main(int argc, char** argv){
int64_t sum(0);
for(int i = 0; i < batchSize; i++){
if(i > 0 && args.getDelay()) sleep(args.getDelay());
- int64_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize());
+ Time time = publisher.publish(
+ args.getMessages(), args.getSubscribers(), args.getSize());
if(!max || time > max) max = time;
if(!min || time < min) min = time;
sum += time;
- std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl;
+ std::cout << "Completed " << (i+1) << " of " << batchSize
+ << " in " << time/TIME_MSEC << "ms" << std::endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
@@ -129,7 +131,7 @@ int main(int argc, char** argv){
channel.close();
connection.close();
}catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ std::cout << error.what() << std::endl;
}
}
}
@@ -153,7 +155,7 @@ void Publisher::waitForCompletion(int msgs){
int64_t Publisher::publish(int msgs, int listeners, int size){
Message msg;
msg.setData(generateData(size));
- int64_t start = Time::now().msecs();
+ Time start = now();
{
Monitor::ScopedLock l(monitor);
for(int i = 0; i < msgs; i++){
@@ -170,7 +172,7 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
waitForCompletion(listeners);
}
- int64_t finish(Time::now().msecs());
+ Time finish = now();
return finish - start;
}
diff --git a/cpp/test/unit/qpid/ExceptionTest.cpp b/cpp/test/unit/qpid/ExceptionTest.cpp
new file mode 100644
index 0000000000..7c3261dc29
--- /dev/null
+++ b/cpp/test/unit/qpid/ExceptionTest.cpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <qpid/Exception.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid;
+
+struct CountDestroyedException : public Exception {
+ int& count;
+ static int staticCount;
+ CountDestroyedException() : count(staticCount) { }
+ CountDestroyedException(int& n) : count(n) {}
+ ~CountDestroyedException() throw() { count++; }
+ void throwSelf() const { throw *this; }
+};
+
+int CountDestroyedException::staticCount = 0;
+
+
+class ExceptionTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ExceptionTest);
+ CPPUNIT_TEST(testHeapException);
+ CPPUNIT_TEST_SUITE_END();
+ public:
+ // Verify proper memory management for heap-allocated exceptions.
+ void testHeapException() {
+ int count = 0;
+ try {
+ std::auto_ptr<Exception> p(
+ new CountDestroyedException(count));
+ p.release()->throwSelf();
+ CPPUNIT_FAIL("Expected CountDestroyedException.");
+ } catch (const CountDestroyedException& e) {
+ CPPUNIT_ASSERT(&e.count == &count);
+ }
+ CPPUNIT_ASSERT_EQUAL(1, count);
+ }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ExceptionTest);
+
diff --git a/cpp/test/unit/qpid/posix/EventChannelTest.cpp b/cpp/test/unit/qpid/posix/EventChannelTest.cpp
new file mode 100644
index 0000000000..8846a0e340
--- /dev/null
+++ b/cpp/test/unit/qpid/posix/EventChannelTest.cpp
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/posix/EventChannel.h>
+#include <qpid/posix/check.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/sys/Socket.h>
+#include <qpid/sys/Thread.h>
+#include <qpid_test_plugin.h>
+
+#include <sys/socket.h>
+#include <signal.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <iostream>
+
+using namespace qpid::sys;
+
+
+const char hello[] = "hello";
+const size_t size = sizeof(hello);
+
+struct RunMe : public Runnable
+{
+ bool ran;
+ RunMe() : ran(false) {}
+ void run() { ran = true; }
+};
+
+class EventChannelTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(EventChannelTest);
+ CPPUNIT_TEST(testEvent);
+ CPPUNIT_TEST(testRead);
+ CPPUNIT_TEST(testFailedRead);
+ CPPUNIT_TEST(testWrite);
+ CPPUNIT_TEST(testFailedWrite);
+ CPPUNIT_TEST(testReadWrite);
+ CPPUNIT_TEST(testAccept);
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+ EventChannel::shared_ptr ec;
+ int pipe[2];
+ char readBuf[size];
+
+ public:
+
+ void setUp()
+ {
+ memset(readBuf, size, 0);
+ ec = EventChannel::create();
+ if (::pipe(pipe) != 0) throw QPID_POSIX_ERROR(errno);
+ // Ignore SIGPIPE, otherwise we will crash writing to broken pipe.
+ signal(SIGPIPE, SIG_IGN);
+ }
+
+ // Verify that calling getEvent returns event.
+ template <class T> bool isNextEvent(T& event)
+ {
+ return &event == dynamic_cast<T*>(ec->getEvent());
+ }
+
+ template <class T> bool isNextEventOk(T& event)
+ {
+ Event* next = ec->getEvent();
+ if (next) next->throwIfError();
+ return &event == next;
+ }
+
+ void testEvent()
+ {
+ RunMe runMe;
+ CPPUNIT_ASSERT(!runMe.ran);
+ // Instances of Event just pass thru the channel immediately.
+ Event e(runMe.functor());
+ ec->postEvent(e);
+ CPPUNIT_ASSERT(isNextEventOk(e));
+ e.dispatch();
+ CPPUNIT_ASSERT(runMe.ran);
+ }
+
+ void testRead() {
+ ReadEvent re(pipe[0], readBuf, size);
+ ec->postEvent(re);
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::write(pipe[1], hello, size));
+ CPPUNIT_ASSERT(isNextEventOk(re));
+ CPPUNIT_ASSERT_EQUAL(size, re.getSize());
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+
+ void testFailedRead()
+ {
+ ReadEvent re(pipe[0], readBuf, size);
+ ec->postEvent(re);
+
+ // EOF before all data read.
+ ::close(pipe[1]);
+ CPPUNIT_ASSERT(isNextEvent(re));
+ CPPUNIT_ASSERT(re.hasError());
+ try {
+ re.throwIfError();
+ CPPUNIT_FAIL("Expected QpidError.");
+ }
+ catch (const qpid::QpidError&) { }
+
+ // Bad file descriptor. Note in this case we fail
+ // in postEvent and throw immediately.
+ try {
+ ReadEvent bad;
+ ec->postEvent(bad);
+ CPPUNIT_FAIL("Expected QpidError.");
+ }
+ catch (const qpid::QpidError&) { }
+ }
+
+ void testWrite() {
+ WriteEvent wr(pipe[1], hello, size);
+ ec->postEvent(wr);
+ CPPUNIT_ASSERT(isNextEventOk(wr));
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size), ::read(pipe[0], readBuf, size));;
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+
+ void testFailedWrite() {
+ WriteEvent wr(pipe[1], hello, size);
+ ::close(pipe[0]);
+ ec->postEvent(wr);
+ CPPUNIT_ASSERT(isNextEvent(wr));
+ CPPUNIT_ASSERT(wr.hasError());
+ }
+
+ void testReadWrite()
+ {
+ ReadEvent re(pipe[0], readBuf, size);
+ WriteEvent wr(pipe[1], hello, size);
+ ec->postEvent(re);
+ ec->postEvent(wr);
+ ec->getEvent();
+ ec->getEvent();
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+
+ void testAccept() {
+ Socket s = Socket::createTcp();
+ int port = s.listen(0, 10);
+ CPPUNIT_ASSERT(port != 0);
+
+ AcceptEvent ae(s.fd());
+ ec->postEvent(ae);
+ Socket client = Socket::createTcp();
+ client.connect("localhost", port);
+ CPPUNIT_ASSERT(isNextEvent(ae));
+ ae.dispatch();
+
+ // Verify client writes are read by the accepted descriptor.
+ char readBuf[size];
+ ReadEvent re(ae.getAcceptedDesscriptor(), readBuf, size);
+ ec->postEvent(re);
+ CPPUNIT_ASSERT_EQUAL(ssize_t(size), client.send(hello, sizeof(hello)));
+ CPPUNIT_ASSERT(isNextEvent(re));
+ re.dispatch();
+ CPPUNIT_ASSERT_EQUAL(std::string(hello), std::string(readBuf));
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelTest);
+
diff --git a/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
new file mode 100644
index 0000000000..5c467880be
--- /dev/null
+++ b/cpp/test/unit/qpid/posix/EventChannelThreadsTest.cpp
@@ -0,0 +1,247 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <boost/bind.hpp>
+
+#include <qpid/sys/Socket.h>
+#include <qpid/posix/EventChannelThreads.h>
+#include <qpid_test_plugin.h>
+
+
+using namespace std;
+
+using namespace qpid::sys;
+
+const int nConnections = 5;
+const int nMessages = 10; // Messages read/written per connection.
+
+
+// Accepts + reads + writes.
+const int totalEvents = nConnections+2*nConnections*nMessages;
+
+/**
+ * Messages are numbered 0..nMessages.
+ * We count the total number of events, and the
+ * number of reads and writes for each message number.
+ */
+class TestResults : public Monitor {
+ public:
+ TestResults() : isShutdown(false), nEventsRemaining(totalEvents) {}
+
+ void countEvent() {
+ if (--nEventsRemaining == 0)
+ shutdown();
+ }
+
+ void countRead(int messageNo) {
+ ++reads[messageNo];
+ countEvent();
+ }
+
+ void countWrite(int messageNo) {
+ ++writes[messageNo];
+ countEvent();
+ }
+
+ void shutdown(const std::string& exceptionMsg = std::string()) {
+ ScopedLock lock(*this);
+ exception = exceptionMsg;
+ isShutdown = true;
+ notifyAll();
+ }
+
+ void wait() {
+ ScopedLock lock(*this);
+ Time deadline = now() + 10*TIME_SEC;
+ while (!isShutdown) {
+ CPPUNIT_ASSERT(Monitor::wait(deadline));
+ }
+ }
+
+ bool isShutdown;
+ std::string exception;
+ AtomicCount reads[nMessages];
+ AtomicCount writes[nMessages];
+ AtomicCount nEventsRemaining;
+};
+
+TestResults results;
+
+EventChannelThreads::shared_ptr threads;
+
+// Functor to wrap callbacks in try/catch.
+class SafeCallback {
+ public:
+ SafeCallback(Runnable& r) : callback(r.functor()) {}
+ SafeCallback(Event::Callback cb) : callback(cb) {}
+
+ void operator()() {
+ std::string exception;
+ try {
+ callback();
+ return;
+ }
+ catch (const std::exception& e) {
+ exception = e.what();
+ }
+ catch (...) {
+ exception = "Unknown exception.";
+ }
+ results.shutdown(exception);
+ }
+
+ private:
+ Event::Callback callback;
+};
+
+/** Repost an event N times. */
+class Repost {
+ public:
+ Repost(int n) : count (n) {}
+ virtual ~Repost() {}
+
+ void repost(Event* event) {
+ if (--count==0) {
+ delete event;
+ } else {
+ threads->postEvent(event);
+ }
+ }
+ private:
+ int count;
+};
+
+
+
+/** Repeating read event. */
+class TestReadEvent : public ReadEvent, public Runnable, private Repost {
+ public:
+ explicit TestReadEvent(int fd=-1) :
+ ReadEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+ Repost(nMessages)
+ {}
+
+ void run() {
+ CPPUNIT_ASSERT_EQUAL(sizeof(value), getSize());
+ CPPUNIT_ASSERT(0 <= value);
+ CPPUNIT_ASSERT(value < nMessages);
+ results.countRead(value);
+ repost(this);
+ }
+
+ private:
+ int value;
+ ReadEvent original;
+};
+
+
+/** Fire and forget write event */
+class TestWriteEvent : public WriteEvent, public Runnable, private Repost {
+ public:
+ TestWriteEvent(int fd=-1) :
+ WriteEvent(fd, &value, sizeof(value), SafeCallback(*this)),
+ Repost(nMessages),
+ value(0)
+ {}
+
+ void run() {
+ CPPUNIT_ASSERT_EQUAL(sizeof(int), getSize());
+ results.countWrite(value++);
+ repost(this);
+ }
+
+ private:
+ int value;
+};
+
+/** Fire-and-forget Accept event, posts reads on the accepted connection. */
+class TestAcceptEvent : public AcceptEvent, public Runnable, private Repost {
+ public:
+ TestAcceptEvent(int fd=-1) :
+ AcceptEvent(fd, SafeCallback(*this)),
+ Repost(nConnections)
+ {}
+
+ void run() {
+ threads->postEvent(new TestReadEvent(getAcceptedDesscriptor()));
+ results.countEvent();
+ repost(this);
+ }
+};
+
+class EventChannelThreadsTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(EventChannelThreadsTest);
+ CPPUNIT_TEST(testThreads);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void setUp() {
+ threads = EventChannelThreads::create(EventChannel::create());
+ }
+
+ void tearDown() {
+ threads.reset();
+ }
+
+ void testThreads()
+ {
+ Socket listener = Socket::createTcp();
+ int port = listener.listen();
+
+ // Post looping accept events, will repost nConnections times.
+ // The accept event will automatically post read events.
+ threads->postEvent(new TestAcceptEvent(listener.fd()));
+
+ // Make connections.
+ Socket connections[nConnections];
+ for (int i = 0; i < nConnections; ++i) {
+ connections[i] = Socket::createTcp();
+ connections[i].connect("localhost", port);
+ }
+
+ // Post looping write events.
+ for (int i = 0; i < nConnections; ++i) {
+ threads->postEvent(new TestWriteEvent(connections[i].fd()));
+ }
+
+ // Wait for all events to be dispatched.
+ results.wait();
+
+ if (!results.exception.empty()) CPPUNIT_FAIL(results.exception);
+ CPPUNIT_ASSERT_EQUAL(0, int(results.nEventsRemaining));
+
+ // Expect a read and write for each messageNo from each connection.
+ for (int messageNo = 0; messageNo < nMessages; ++messageNo) {
+ CPPUNIT_ASSERT_EQUAL(nConnections, int(results.reads[messageNo]));
+ CPPUNIT_ASSERT_EQUAL(nConnections, int(results.writes[messageNo]));
+ }
+
+ threads->shutdown();
+ threads->join();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelThreadsTest);
+