summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/posix
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/posix')
-rw-r--r--cpp/src/qpid/posix/EpollEventChannel.cpp76
-rw-r--r--cpp/src/qpid/posix/EpollEventChannel.h48
-rw-r--r--cpp/src/qpid/posix/EventChannel.cpp325
-rw-r--r--cpp/src/qpid/posix/EventChannel.h176
-rw-r--r--cpp/src/qpid/posix/EventChannelThreads.cpp119
-rw-r--r--cpp/src/qpid/posix/EventChannelThreads.h92
-rw-r--r--cpp/src/qpid/posix/Socket.cpp58
-rw-r--r--cpp/src/qpid/posix/check.cpp12
-rw-r--r--cpp/src/qpid/posix/check.h31
9 files changed, 791 insertions, 146 deletions
diff --git a/cpp/src/qpid/posix/EpollEventChannel.cpp b/cpp/src/qpid/posix/EpollEventChannel.cpp
deleted file mode 100644
index 7dce4bc58c..0000000000
--- a/cpp/src/qpid/posix/EpollEventChannel.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/sys/EventChannel.h>
-#include <sys/epoll.h>
-#include "EpollEventChannel.h"
-
-namespace qpid {
-namespace sys {
-
-EventChannel::shared_ptr EventChannel::create()
-{
- return EventChannel::shared_ptr(new EpollEventChannel());
-}
-
-EpollEventChannel::EpollEventChannel()
-{
- // TODO aconway 2006-11-13: How to choose size parameter?
- static const size_t estimatedFdsForEpoll = 1000;
- epollFd = epoll_create(estimatedFdsForEpoll);
-}
-
-EpollEventChannel::~EpollEventChannel() { }
-
-void
-EpollEventChannel::post(ReadEvent& /*event*/)
-{
-}
-
-void
-EpollEventChannel::post(WriteEvent& /*event*/)
-{
-}
-
-void
-EpollEventChannel::post(AcceptEvent& /*event*/)
-{
-}
-
-void
-EpollEventChannel::post(NotifyEvent& /*event*/)
-{
-}
-
-inline void
-EpollEventChannel::post(Event& /*event*/)
-{
-}
-
-Event*
-EpollEventChannel::getEvent()
-{
- return 0;
-}
-
-void
-EpollEventChannel::dispose(void* /*buffer*/, size_t)
-{
-}
-
-}}
diff --git a/cpp/src/qpid/posix/EpollEventChannel.h b/cpp/src/qpid/posix/EpollEventChannel.h
deleted file mode 100644
index 8128d5276f..0000000000
--- a/cpp/src/qpid/posix/EpollEventChannel.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <qpid/sys/EventChannel.h>
-
-namespace qpid {
-namespace sys {
-
-/** Epoll-based implementation of the event channel */
-class EpollEventChannel : public EventChannel
-{
- public:
-
- EpollEventChannel();
- ~EpollEventChannel();
-
- virtual void post(ReadEvent& event);
- virtual void post(WriteEvent& event);
- virtual void post(AcceptEvent& event);
- virtual void post(NotifyEvent& event);
-
- inline void post(Event& event);
-
- virtual Event* getEvent();
-
- virtual void dispose(void* buffer, size_t size);
-
- private:
- int epollFd;
-
-};
-
-}}
diff --git a/cpp/src/qpid/posix/EventChannel.cpp b/cpp/src/qpid/posix/EventChannel.cpp
new file mode 100644
index 0000000000..9d0819f206
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannel.cpp
@@ -0,0 +1,325 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <mqueue.h>
+#include <string.h>
+#include <iostream>
+
+#include <sys/errno.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+
+#include <typeinfo>
+#include <iostream>
+#include <queue>
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/current_function.hpp>
+
+#include <qpid/QpidError.h>
+#include <qpid/sys/Monitor.h>
+
+#include "check.h"
+#include "EventChannel.h"
+
+using namespace std;
+
+
+// Convenience template to zero out a struct.
+template <class S> struct ZeroStruct : public S {
+ ZeroStruct() { memset(this, 0, sizeof(*this)); }
+};
+
+namespace qpid {
+namespace sys {
+
+
+/**
+ * EventHandler wraps an epoll file descriptor. Acts as private
+ * interface between EventChannel and subclasses.
+ *
+ * Also implements Event interface for events that are not associated
+ * with a file descriptor and are passed via the message queue.
+ */
+class EventHandler : public Event, private Monitor
+{
+ public:
+ EventHandler(int epollSize = 256);
+ ~EventHandler();
+
+ int getEpollFd() { return epollFd; }
+ void epollAdd(int fd, uint32_t epollEvents, Event* event);
+ void epollMod(int fd, uint32_t epollEvents, Event* event);
+ void epollDel(int fd);
+
+ void mqPut(Event* event);
+ Event* mqGet();
+
+ protected:
+ // Should never be called, only complete.
+ void prepare(EventHandler&) { assert(0); }
+ Event* complete(EventHandler& eh);
+
+ private:
+ int epollFd;
+ std::string mqName;
+ int mqFd;
+ std::queue<Event*> mqEvents;
+};
+
+EventHandler::EventHandler(int epollSize)
+{
+ epollFd = epoll_create(epollSize);
+ if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
+
+ // Create a POSIX message queue for non-fd events.
+ // We write one byte and never read it is always ready for read
+ // when we add it to epoll.
+ //
+ ZeroStruct<struct mq_attr> attr;
+ attr.mq_maxmsg = 1;
+ attr.mq_msgsize = 1;
+ do {
+ char tmpnam[L_tmpnam];
+ tmpnam_r(tmpnam);
+ mqName = tmpnam + 4; // Skip "tmp/"
+ mqFd = mq_open(
+ mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
+ if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
+ } while (mqFd == EEXIST); // Name already taken, try again.
+
+ static char zero = '\0';
+ mq_send(mqFd, &zero, 1, 0);
+ epollAdd(mqFd, 0, this);
+}
+
+EventHandler::~EventHandler() {
+ mq_close(mqFd);
+ mq_unlink(mqName.c_str());
+}
+
+void EventHandler::mqPut(Event* event) {
+ ScopedLock l(*this);
+ assert(event != 0);
+ mqEvents.push(event);
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+}
+
+Event* EventHandler::mqGet() {
+ ScopedLock l(*this);
+ if (mqEvents.empty())
+ return 0;
+ Event* event = mqEvents.front();
+ mqEvents.pop();
+ if(!mqEvents.empty())
+ epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
+ return event;
+}
+
+void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
+{
+ ZeroStruct<struct epoll_event> ee;
+ ee.data.ptr = event;
+ ee.events = epollEvents;
+ if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+void EventHandler::epollDel(int fd) {
+ if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
+ throw QPID_POSIX_ERROR(errno);
+}
+
+Event* EventHandler::complete(EventHandler& eh)
+{
+ assert(&eh == this);
+ Event* event = mqGet();
+ return event==0 ? 0 : event->complete(eh);
+}
+
+// ================================================================
+// EventChannel
+
+EventChannel::shared_ptr EventChannel::create() {
+ return shared_ptr(new EventChannel());
+}
+
+EventChannel::EventChannel() : handler(new EventHandler()) {}
+
+EventChannel::~EventChannel() {}
+
+void EventChannel::postEvent(Event& e)
+{
+ e.prepare(*handler);
+}
+
+Event* EventChannel::getEvent()
+{
+ static const int infiniteTimeout = -1;
+ ZeroStruct<struct epoll_event> epollEvent;
+
+ // Loop until we can complete the event. Some events may re-post
+ // themselves and return 0 from complete, e.g. partial reads. //
+ Event* event = 0;
+ while (event == 0) {
+ int eventCount = epoll_wait(handler->getEpollFd(),
+ &epollEvent, 1, infiniteTimeout);
+ if (eventCount < 0) {
+ if (errno != EINTR) {
+ // TODO aconway 2006-11-28: Proper handling/logging of errors.
+ cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
+ << PosixError::getMessage(errno) << endl;
+ assert(0);
+ }
+ }
+ else if (eventCount == 1) {
+ event = reinterpret_cast<Event*>(epollEvent.data.ptr);
+ assert(event != 0);
+ try {
+ event = event->complete(*handler);
+ }
+ catch (const Exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ catch (const std::exception& e) {
+ if (event)
+ event->setError(e);
+ }
+ }
+ }
+ return event;
+}
+
+Event::~Event() {}
+
+void Event::prepare(EventHandler& handler)
+{
+ handler.mqPut(this);
+}
+
+bool Event::hasError() const {
+ return error;
+}
+
+void Event::throwIfError() throw (Exception) {
+ if (hasError())
+ error.throwSelf();
+}
+
+Event* Event::complete(EventHandler&)
+{
+ return this;
+}
+
+void Event::dispatch()
+{
+ try {
+ if (!callback.empty())
+ callback();
+ } catch (const std::exception&) {
+ throw;
+ } catch (...) {
+ throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
+ }
+}
+
+void Event::setError(const ExceptionHolder& e) {
+ error = e;
+}
+
+void ReadEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+ssize_t ReadEvent::doRead() {
+ ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
+ size - received);
+ if (n > 0) received += n;
+ return n;
+}
+
+Event* ReadEvent::complete(EventHandler& handler)
+{
+ // Read as much as possible without blocking.
+ ssize_t n = doRead();
+ while (n > 0 && received < size) doRead();
+
+ if (received == size) {
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ return this;
+ }
+ else if (n <0 && (errno == EAGAIN)) {
+ // Keep polling for more.
+ handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
+ return 0;
+ }
+ else {
+ // Unexpected EOF or error. Throw ENODATA for EOF.
+ handler.epollDel(descriptor);
+ received = 0; // Reset for re-use.
+ throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
+ }
+}
+
+void WriteEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+}
+
+Event* WriteEvent::complete(EventHandler& handler)
+{
+ ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
+ size - written);
+ if (n < 0) throw QPID_POSIX_ERROR(errno);
+ written += n;
+ if(written < size) {
+ // Keep polling.
+ handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
+ return 0;
+ }
+ written = 0; // Reset for re-use.
+ handler.epollDel(descriptor);
+ return this;
+}
+
+void AcceptEvent::prepare(EventHandler& handler)
+{
+ handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
+}
+
+Event* AcceptEvent::complete(EventHandler& handler)
+{
+ handler.epollDel(descriptor);
+ accepted = ::accept(descriptor, 0, 0);
+ if (accepted < 0) throw QPID_POSIX_ERROR(errno);
+ return this;
+}
+
+}}
diff --git a/cpp/src/qpid/posix/EventChannel.h b/cpp/src/qpid/posix/EventChannel.h
new file mode 100644
index 0000000000..55fd2ad135
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannel.h
@@ -0,0 +1,176 @@
+#ifndef _sys_EventChannel_h
+#define _sys_EventChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/SharedObject.h>
+#include <qpid/ExceptionHolder.h>
+#include <boost/function.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class Event;
+class EventHandler;
+class EventChannel;
+
+/**
+ * Base class for all Events.
+ */
+class Event
+{
+ public:
+ /** Type for callback when event is dispatched */
+ typedef boost::function0<void> Callback;
+
+ /**
+ * Create an event with optional callback.
+ * Instances of Event are sent directly through the channel.
+ * Derived classes define additional waiting behaviour.
+ *@param cb A callback functor that is invoked when dispatch() is called.
+ */
+ Event(Callback cb = 0) : callback(cb) {}
+
+ virtual ~Event();
+
+ /** Call the callback provided to the constructor, if any. */
+ void dispatch();
+
+ /** True if there was an error processing this event */
+ bool hasError() const;
+
+ /** If hasError() throw the corresponding exception. */
+ void throwIfError() throw(Exception);
+
+ protected:
+ virtual void prepare(EventHandler&);
+ virtual Event* complete(EventHandler&);
+ void setError(const ExceptionHolder& e);
+
+ Callback callback;
+ ExceptionHolder error;
+
+ friend class EventChannel;
+ friend class EventHandler;
+};
+
+template <class BufT>
+class IOEvent : public Event {
+ public:
+ void getDescriptor() const { return descriptor; }
+ size_t getSize() const { return size; }
+ BufT getBuffer() const { return buffer; }
+
+ protected:
+ IOEvent(int fd, Callback cb, size_t sz, BufT buf) :
+ Event(cb), descriptor(fd), buffer(buf), size(sz) {}
+
+ int descriptor;
+ BufT buffer;
+ size_t size;
+};
+
+/** Asynchronous read event */
+class ReadEvent : public IOEvent<void*>
+{
+ public:
+ explicit ReadEvent(int fd=-1, void* buf=0, size_t sz=0, Callback cb=0) :
+ IOEvent<void*>(fd, cb, sz, buf), received(0) {}
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+ ssize_t doRead();
+
+ size_t received;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public IOEvent<const void*>
+{
+ public:
+ explicit WriteEvent(int fd=-1, const void* buf=0, size_t sz=0,
+ Callback cb=0) :
+ IOEvent<const void*>(fd, cb, sz, buf), written(0) {}
+
+ protected:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ private:
+ ssize_t doWrite();
+ size_t written;
+};
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public Event
+{
+ public:
+ /** Accept a connection on fd. */
+ explicit AcceptEvent(int fd=-1, Callback cb=0) :
+ Event(cb), descriptor(fd), accepted(0) {}
+
+ /** Get descriptor for server socket */
+ int getAcceptedDesscriptor() const { return accepted; }
+
+ private:
+ void prepare(EventHandler&);
+ Event* complete(EventHandler&);
+
+ int descriptor;
+ int accepted;
+};
+
+
+class QueueSet;
+
+/**
+ * Channel to post and wait for events.
+ */
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static shared_ptr create();
+
+ ~EventChannel();
+
+ /** Post an event to the channel. */
+ void postEvent(Event& event);
+
+ /** Post an event to the channel. Must not be 0. */
+ void postEvent(Event* event) { postEvent(*event); }
+
+ /**
+ * Wait for the next complete event.
+ *@return Pointer to event. Will never return 0.
+ */
+ Event* getEvent();
+
+ private:
+ EventChannel();
+ boost::shared_ptr<EventHandler> handler;
+};
+
+
+}}
+
+
+
+#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/src/qpid/posix/EventChannelThreads.cpp b/cpp/src/qpid/posix/EventChannelThreads.cpp
new file mode 100644
index 0000000000..f97efd17e8
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannelThreads.cpp
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "EventChannelThreads.h"
+#include <qpid/sys/Runnable.h>
+#include <iostream>
+using namespace std;
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+EventChannelThreads::shared_ptr EventChannelThreads::create(
+ EventChannel::shared_ptr ec)
+{
+ return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+}
+
+EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
+ channel(ec), nWaiting(0), state(RUNNING)
+{
+ // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
+ addThread();
+}
+
+EventChannelThreads::~EventChannelThreads() {
+ shutdown();
+ join();
+}
+
+void EventChannelThreads::shutdown()
+{
+ ScopedLock lock(*this);
+ if (state != RUNNING) // Already shutting down.
+ return;
+ for (size_t i = 0; i < workers.size(); ++i) {
+ channel->postEvent(terminate);
+ }
+ state = TERMINATE_SENT;
+ notify(); // Wake up one join() thread.
+}
+
+void EventChannelThreads::join()
+{
+ {
+ ScopedLock lock(*this);
+ while (state == RUNNING) // Wait for shutdown to start.
+ wait();
+ if (state == SHUTDOWN) // Shutdown is complete
+ return;
+ if (state == JOINING) {
+ // Someone else is doing the join.
+ while (state != SHUTDOWN)
+ wait();
+ return;
+ }
+ // I'm the joining thread
+ assert(state == TERMINATE_SENT);
+ state = JOINING;
+ } // Drop the lock.
+
+ for (size_t i = 0; i < workers.size(); ++i) {
+ assert(state == JOINING); // Only this thread can change JOINING.
+ workers[i].join();
+ }
+ state = SHUTDOWN;
+ notifyAll(); // Notify other join() threaeds.
+}
+
+void EventChannelThreads::addThread() {
+ ScopedLock l(*this);
+ workers.push_back(Thread(*this));
+}
+
+void EventChannelThreads::run()
+{
+ // Start life waiting. Decrement on exit.
+ AtomicCount::ScopedIncrement inc(nWaiting);
+ try {
+ while (true) {
+ Event* e = channel->getEvent();
+ assert(e != 0);
+ if (e == &terminate) {
+ return;
+ }
+ AtomicCount::ScopedDecrement dec(nWaiting);
+ // I'm no longer waiting, make sure someone is.
+ if (dec == 0)
+ addThread();
+ e->dispatch();
+ }
+ }
+ catch (const std::exception& e) {
+ // TODO aconway 2006-11-15: need better logging across the board.
+ std::cerr << "EventChannelThreads::run() caught: " << e.what()
+ << std::endl;
+ }
+ catch (...) {
+ std::cerr << "EventChannelThreads::run() caught unknown exception."
+ << std::endl;
+ }
+}
+
+}}
diff --git a/cpp/src/qpid/posix/EventChannelThreads.h b/cpp/src/qpid/posix/EventChannelThreads.h
new file mode 100644
index 0000000000..ae172ae752
--- /dev/null
+++ b/cpp/src/qpid/posix/EventChannelThreads.h
@@ -0,0 +1,92 @@
+#ifndef _posix_EventChannelThreads_h
+#define _sys_EventChannelThreads_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <vector>
+
+#include <qpid/Exception.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Monitor.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/AtomicCount.h>
+#include "EventChannel.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ Dynamic thread pool serving an EventChannel.
+
+ Threads run a loop { e = getEvent(); e->dispatch(); }
+ The size of the thread pool is automatically adjusted to optimal size.
+*/
+class EventChannelThreads :
+ public qpid::SharedObject<EventChannelThreads>,
+ public sys::Monitor, private sys::Runnable
+{
+ public:
+ /** Create the thread pool and start initial threads. */
+ static EventChannelThreads::shared_ptr create(
+ EventChannel::shared_ptr channel
+ );
+
+ ~EventChannelThreads();
+
+ /** Post event to the underlying channel */
+ void postEvent(Event& event) { channel->postEvent(event); }
+
+ /** Post event to the underlying channel Must not be 0. */
+ void postEvent(Event* event) { channel->postEvent(event); }
+
+ /**
+ * Terminate all threads.
+ *
+ * Returns immediately, use join() to wait till all threads are
+ * shut down.
+ */
+ void shutdown();
+
+ /** Wait for all threads to terminate. */
+ void join();
+
+ private:
+ typedef std::vector<sys::Thread> Threads;
+ typedef enum {
+ RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ } State;
+
+ EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ void addThread();
+
+ void run();
+ bool keepRunning();
+ void adjustThreads();
+
+ EventChannel::shared_ptr channel;
+ Threads workers;
+ sys::AtomicCount nWaiting;
+ State state;
+ Event terminate;
+};
+
+
+}}
+
+
+#endif /*!_sys_EventChannelThreads_h*/
diff --git a/cpp/src/qpid/posix/Socket.cpp b/cpp/src/qpid/posix/Socket.cpp
index 1321ae6b0d..6d47c64b32 100644
--- a/cpp/src/qpid/posix/Socket.cpp
+++ b/cpp/src/qpid/posix/Socket.cpp
@@ -20,6 +20,7 @@
*/
#include <sys/socket.h>
+#include <sys/errno.h>
#include <netinet/in.h>
#include <netdb.h>
@@ -31,60 +32,87 @@
using namespace qpid::sys;
-Socket::Socket() : socket(::socket (PF_INET, SOCK_STREAM, 0))
+Socket Socket::createTcp()
{
- CHECKNN(socket == 0);
+ int s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return s;
}
-void
-Socket::setTimeout(long msecs)
+Socket::Socket(int descriptor) : socket(descriptor) {}
+
+void Socket::setTimeout(Time interval)
{
struct timeval tv;
- tv.tv_sec = msecs / 1000;
- tv.tv_usec = (msecs % 1000)*1000;
+ tv.tv_sec = interval/TIME_SEC;
+ tv.tv_usec = (interval%TIME_SEC)/TIME_USEC;
setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
}
-void
-Socket::connect(const std::string& host, int port)
+void Socket::connect(const std::string& host, int port)
{
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
struct hostent* hp = gethostbyname ( host.c_str() );
- if (hp == 0) CHECK0(-1); // TODO aconway 2006-11-09: error message?
+ if (hp == 0) throw QPID_POSIX_ERROR(errno);
memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
- CHECK0(::connect(socket, (struct sockaddr*)(&name), sizeof(name)));
+ if (::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
}
void
Socket::close()
{
if (socket == 0) return;
- CHECK0(::close(socket));
+ if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
socket = 0;
}
ssize_t
-Socket::send(const char* data, size_t size)
+Socket::send(const void* data, size_t size)
{
ssize_t sent = ::send(socket, data, size, 0);
if (sent < 0) {
if (errno == ECONNRESET) return SOCKET_EOF;
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- CHECK0(sent);
+ throw QPID_POSIX_ERROR(errno);
}
return sent;
}
ssize_t
-Socket::recv(char* data, size_t size)
+Socket::recv(void* data, size_t size)
{
ssize_t received = ::recv(socket, data, size, 0);
if (received < 0) {
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- CHECK0(received);
+ throw QPID_POSIX_ERROR(errno);
}
return received;
}
+
+int Socket::listen(int port, int backlog)
+{
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons(port);
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
+ throw QPID_POSIX_ERROR(errno);
+ if (::listen(socket, backlog) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ socklen_t namelen = sizeof(name);
+ if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ return ntohs(name.sin_port);
+}
+
+
+int Socket::fd()
+{
+ return socket;
+}
diff --git a/cpp/src/qpid/posix/check.cpp b/cpp/src/qpid/posix/check.cpp
index 2ef52f68b7..408679caa8 100644
--- a/cpp/src/qpid/posix/check.cpp
+++ b/cpp/src/qpid/posix/check.cpp
@@ -19,15 +19,21 @@
*
*/
-#include <qpid/QpidError.h>
+#include <cerrno>
#include "check.h"
namespace qpid {
namespace sys {
-std::string errnoToString() {
+std::string
+PosixError::getMessage(int errNo)
+{
char buf[512];
- return strerror_r(errno, buf, sizeof(buf));
+ return std::string(strerror_r(errNo, buf, sizeof(buf)));
}
+PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
+ : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
+{ }
+
}}
diff --git a/cpp/src/qpid/posix/check.h b/cpp/src/qpid/posix/check.h
index 666637b1c2..2d3a8d30e3 100644
--- a/cpp/src/qpid/posix/check.h
+++ b/cpp/src/qpid/posix/check.h
@@ -22,18 +22,41 @@
*
*/
-#include <errno.h>
+#include <cerrno>
#include <string>
#include <qpid/QpidError.h>
namespace qpid {
namespace sys {
-std::string errnoToString();
+/**
+ * Exception with message from errno.
+ */
+class PosixError : public qpid::QpidError
+{
+ public:
+ static std::string getMessage(int errNo);
+
+ PosixError(int errNo, const qpid::SrcLine& location) throw();
+
+ ~PosixError() throw() {}
+
+ int getErrNo() { return errNo; }
+
+ Exception* clone() const throw() { return new PosixError(*this); }
+
+ void throwSelf() { throw *this; }
+
+ private:
+ int errNo;
+};
-#define CHECK0(N) if ((N)!=0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString())
-#define CHECKNN(N) if ((N)<0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString())
}}
+/** Create a PosixError for the current file/line and errno. */
+#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+/** Throw a posix error if errNo is non-zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ if ((ERRNO) != 0) throw QPID_POSIX_ERROR((ERRNO))
#endif /*!_posix_check_h*/