summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/posix
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/posix')
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp655
-rw-r--r--qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp264
-rw-r--r--qpid/cpp/src/qpid/sys/posix/BSDSocket.h113
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Condition.cpp45
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Condition.h82
-rwxr-xr-xqpid/cpp/src/qpid/sys/posix/FileSysDir.cpp80
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Fork.cpp129
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Fork.h82
-rw-r--r--qpid/cpp/src/qpid/sys/posix/IOHandle.cpp29
-rwxr-xr-xqpid/cpp/src/qpid/sys/posix/LockFile.cpp107
-rw-r--r--qpid/cpp/src/qpid/sys/posix/MemStat.cpp38
-rw-r--r--qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp125
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Mutex.cpp46
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Mutex.h158
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Path.cpp60
-rw-r--r--qpid/cpp/src/qpid/sys/posix/PidFile.h62
-rwxr-xr-xqpid/cpp/src/qpid/sys/posix/PipeHandle.cpp64
-rw-r--r--qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp118
-rw-r--r--qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp793
-rw-r--r--qpid/cpp/src/qpid/sys/posix/PrivatePosix.h65
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Shlib.cpp60
-rw-r--r--qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp353
-rw-r--r--qpid/cpp/src/qpid/sys/posix/StrError.cpp41
-rwxr-xr-xqpid/cpp/src/qpid/sys/posix/SystemInfo.cpp201
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Thread.cpp88
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Time.cpp162
-rwxr-xr-xqpid/cpp/src/qpid/sys/posix/Time.h34
-rw-r--r--qpid/cpp/src/qpid/sys/posix/check.h53
28 files changed, 4107 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
new file mode 100644
index 0000000000..7d04d2214d
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -0,0 +1,655 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Probes.h"
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+// TODO The basic algorithm here is not really POSIX specific and with a
+// bit more abstraction could (should) be promoted to be platform portable
+// - The POSIX specific code here is ignoring SIGPIPE which should really
+// be part of the socket code.
+// - And checking errno to detect specific read/write conditions.
+//
+#include <errno.h>
+#include <signal.h>
+
+#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/shared_array.hpp>
+
+namespace qpid {
+namespace sys {
+namespace posix {
+
+namespace {
+
+struct StaticInit {
+ StaticInit() {
+ /**
+ * Make *process* not generate SIGPIPE when writing to closed
+ * pipe/socket (necessary as default action is to terminate process)
+ */
+ ::signal(SIGPIPE, SIG_IGN);
+ };
+} init;
+
+/*
+ * We keep per thread state to avoid locking overhead. The assumption is that
+ * on average all the connections are serviced by all the threads so the state
+ * recorded in each thread is about the same. If this turns out not to be the
+ * case we could rebalance the info occasionally.
+ */
+__thread int threadReadTotal = 0;
+__thread int threadReadCount = 0;
+__thread int threadWriteTotal = 0;
+__thread int threadWriteCount = 0;
+__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms
+}
+
+/*
+ * Asynch Acceptor
+ */
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
+public:
+ AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptor();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void readable(DispatchHandle& handle);
+
+private:
+ AsynchAcceptor::Callback acceptedCallback;
+ DispatchHandle handle;
+ const Socket& socket;
+
+};
+
+AsynchAcceptor::AsynchAcceptor(const Socket& s,
+ AsynchAcceptor::Callback callback) :
+ acceptedCallback(callback),
+ handle((const IOHandle&)s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ socket(s) {
+
+ s.setNonblocking();
+}
+
+AsynchAcceptor::~AsynchAcceptor() {
+ handle.stopWatch();
+}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ handle.startWatch(poller);
+}
+
+/*
+ * We keep on accepting as long as there is something to accept
+ */
+void AsynchAcceptor::readable(DispatchHandle& h) {
+ Socket* s;
+ do {
+ errno = 0;
+ // TODO: Currently we ignore the peers address, perhaps we should
+ // log it or use it for connection acceptance.
+ try {
+ s = socket.accept();
+ if (s) {
+ acceptedCallback(*s);
+ } else {
+ break;
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Could not accept socket: " << e.what());
+ break;
+ }
+ } while (true);
+
+ h.rewatch();
+}
+
+/*
+ * POSIX version of AsynchIO TCP socket connector.
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector,
+ private DispatchHandle {
+
+private:
+ void connComplete(DispatchHandle& handle);
+ void requestedCall(RequestCallback rCb);
+
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+ SocketAddress sa;
+
+public:
+ AsynchConnector(const Socket& socket,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb);
+ void start(Poller::shared_ptr poller);
+ void stop();
+ void requestCallback(RequestCallback rCb);
+};
+
+AsynchConnector::AsynchConnector(const Socket& s,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb) :
+ DispatchHandle((const IOHandle&)s,
+ 0,
+ boost::bind(&AsynchConnector::connComplete, this, _1),
+ boost::bind(&AsynchConnector::connComplete, this, _1)),
+ connCallback(connCb),
+ failCallback(failCb),
+ socket(s),
+ sa(hostname, port)
+{
+ socket.setNonblocking();
+
+ // Note, not catching any exceptions here, also has effect of destructing
+ QPID_LOG(info, "Connecting: " << sa.asString());
+ socket.connect(sa);
+}
+
+void AsynchConnector::start(Poller::shared_ptr poller)
+{
+ startWatch(poller);
+}
+
+void AsynchConnector::stop()
+{
+ stopWatch();
+}
+
+void AsynchConnector::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&AsynchConnector::requestedCall, this, callback));
+}
+
+void AsynchConnector::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
+void AsynchConnector::connComplete(DispatchHandle& h)
+{
+ int errCode = socket.getError();
+ if (errCode == 0) {
+ h.stopWatch();
+ try {
+ socket.finishConnect(sa);
+ } catch (const std::exception& e) {
+ failCallback(socket, 0, e.what());
+ DispatchHandle::doDelete();
+ return;
+ }
+ connCallback(socket);
+ } else {
+ // Retry while we cause an immediate exception
+ // (asynch failure will be handled by re-entering here at the top)
+ while (sa.nextAddress()) {
+ try {
+ // Try next address without deleting ourselves
+ QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+ QPID_LOG(info, "Retrying connect: " << sa.asString());
+ socket.connect(sa);
+ return;
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+ }
+ errCode = socket.getError();
+ }
+ h.stopWatch();
+ failCallback(socket, errCode, strError(errCode));
+ }
+ DispatchHandle::doDelete();
+}
+
+/*
+ * POSIX version of AsynchIO reader/writer
+ *
+ * The class is implemented in terms of DispatchHandle to allow it to be
+ * deleted by deleting the contained DispatchHandle.
+ */
+class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle {
+
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ virtual void queueForDeletion();
+
+ virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void requestCallback(RequestCallback);
+ virtual BufferBase* getQueuedBuffer();
+ virtual SecuritySettings getSecuritySettings();
+
+private:
+ ~AsynchIO();
+
+ // Methods that are callback targets from Dispatcher.
+ void readable(DispatchHandle& handle);
+ void writeable(DispatchHandle& handle);
+ void disconnected(DispatchHandle& handle);
+ void requestedCall(RequestCallback);
+ void close(DispatchHandle& handle);
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
+ bool queuedClose;
+ /**
+ * This flag is used to detect and handle concurrency between
+ * calls to notifyPendingWrite() (which can be made from any thread) and
+ * the execution of the writeable() method (which is always on the
+ * thread processing this handle.
+ */
+ volatile bool writePending;
+};
+
+AsynchIO::AsynchIO(const Socket& s,
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+
+ DispatchHandle((const IOHandle&)s,
+ boost::bind(&AsynchIO::readable, this, _1),
+ boost::bind(&AsynchIO::writeable, this, _1),
+ boost::bind(&AsynchIO::disconnected, this, _1)),
+ readCallback(rCb),
+ eofCallback(eofCb),
+ disCallback(disCb),
+ closedCallback(cCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ socket(s),
+ queuedClose(false),
+ writePending(false) {
+
+ s.setNonblocking();
+}
+
+AsynchIO::~AsynchIO() {
+}
+
+void AsynchIO::queueForDeletion() {
+ DispatchHandle::doDelete();
+}
+
+void AsynchIO::start(Poller::shared_ptr poller) {
+ DispatchHandle::startWatch(poller);
+}
+
+void AsynchIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
+void AsynchIO::queueReadBuffer(BufferBase* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+
+ bool queueWasEmpty = bufferQueue.empty();
+ bufferQueue.push_back(buff);
+ if (queueWasEmpty)
+ DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::unread(BufferBase* buff) {
+ assert(buff);
+ buff->squish();
+
+ bool queueWasEmpty = bufferQueue.empty();
+ bufferQueue.push_front(buff);
+ if (queueWasEmpty)
+ DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::queueWrite(BufferBase* buff) {
+ assert(buff);
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ queueReadBuffer(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ writePending = false;
+ DispatchHandle::rewatchWrite();
+}
+
+// This can happen outside the callback context
+void AsynchIO::notifyPendingWrite() {
+ writePending = true;
+ DispatchHandle::rewatchWrite();
+}
+
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+ DispatchHandle::rewatchWrite();
+}
+
+bool AsynchIO::writeQueueEmpty() {
+ return writeQueue.empty();
+}
+
+void AsynchIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback));
+}
+
+void AsynchIO::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
+/** Return a queued buffer if there are enough
+ * to spare
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+ BufferBase* buff = bufferQueue.empty() ? 0 : bufferQueue.back();
+ // An "unread" buffer is reserved for future read operations (which
+ // take from the front of the queue).
+ if (!buff || (buff->dataCount && bufferQueue.size() == 1)) {
+ QPID_LOG(error, "No IO buffers available");
+ return 0;
+ }
+ assert(buff->dataCount == 0);
+ bufferQueue.pop_back();
+ return buff;
+}
+
+/*
+ * We keep on reading as long as we have something to read, a buffer
+ * to put it in and reading is not stopped by flow control.
+ */
+void AsynchIO::readable(DispatchHandle& h) {
+ AbsTime readStartTime = AbsTime::now();
+ size_t total = 0;
+ int readCalls = 0;
+ do {
+ // (Try to) get a buffer
+ if (!bufferQueue.empty()) {
+ // Read into buffer
+ BufferBase* buff = bufferQueue.front();
+ assert(buff);
+ bufferQueue.pop_front();
+ errno = 0;
+ int readCount = buff->byteCount-buff->dataCount;
+ int rc = socket.read(buff->bytes + buff->dataCount, readCount);
+ ++readCalls;
+ if (rc > 0) {
+ buff->dataCount += rc;
+ threadReadTotal += rc;
+ total += rc;
+
+ readCallback(*this, buff);
+ int64_t duration = Duration(readStartTime, AbsTime::now());
+ if (rc != readCount) {
+ // If we didn't fill the read buffer then time to stop reading
+ QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls);
+ break;
+ }
+
+ // Stop reading if we've overrun our timeslot
+ if ( duration > threadMaxIoTimeNs) {
+ QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, total, readCalls);
+ break;
+ }
+
+ } else {
+ // Put buffer back (at front so it doesn't interfere with unread buffers)
+ bufferQueue.push_front(buff);
+ assert(buff);
+
+ QPID_PROBE5(asynchio_read_finished_error, &h, Duration(readStartTime, AbsTime::now()), total, readCalls, errno);
+ // Eof or other side has gone away
+ if (rc == 0 || errno == ECONNRESET) {
+ eofCallback(*this);
+ h.unwatchRead();
+ break;
+ } else if (errno == EAGAIN) {
+ // We have just put a buffer back so we know
+ // we can carry on watching for reads
+ break;
+ } else {
+ // Report error then just treat as a socket disconnect
+ QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" );
+ eofCallback(*this);
+ h.unwatchRead();
+ break;
+ }
+ }
+ } else {
+ // Something to read but no buffer
+ if (emptyCallback) {
+ emptyCallback(*this);
+ }
+ // If we still have no buffers we can't do anything more
+ if (bufferQueue.empty()) {
+ h.unwatchRead();
+ QPID_PROBE4(asynchio_read_finished_nobuffers, &h, Duration(readStartTime, AbsTime::now()), total, readCalls);
+ break;
+ }
+
+ }
+ } while (true);
+
+ ++threadReadCount;
+ return;
+}
+
+/*
+ * We carry on writing whilst we have data to write and we can write
+ */
+void AsynchIO::writeable(DispatchHandle& h) {
+ AbsTime writeStartTime = AbsTime::now();
+ size_t total = 0;
+ int writeCalls = 0;
+ do {
+ // See if we've got something to write
+ if (!writeQueue.empty()) {
+ // Write buffer
+ BufferBase* buff = writeQueue.back();
+ writeQueue.pop_back();
+ errno = 0;
+ assert(buff->dataStart+buff->dataCount <= buff->byteCount);
+ int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
+ int64_t duration = Duration(writeStartTime, AbsTime::now());
+ ++writeCalls;
+ if (rc >= 0) {
+ threadWriteTotal += rc;
+ total += rc;
+
+ // If we didn't write full buffer put rest back
+ if (rc != buff->dataCount) {
+ buff->dataStart += rc;
+ buff->dataCount -= rc;
+ writeQueue.push_back(buff);
+ QPID_PROBE4(asynchio_write_finished_done, &h, duration, total, writeCalls);
+ break;
+ }
+
+ // Recycle the buffer
+ queueReadBuffer(buff);
+
+ // Stop writing if we've overrun our timeslot
+ if (duration > threadMaxIoTimeNs) {
+ QPID_PROBE4(asynchio_write_finished_maxtime, &h, duration, total, writeCalls);
+ break;
+ }
+ } else {
+ // Put buffer back
+ writeQueue.push_back(buff);
+ QPID_PROBE5(asynchio_write_finished_error, &h, duration, total, writeCalls, errno);
+
+ if (errno == ECONNRESET || errno == EPIPE) {
+ // Just stop watching for write here - we'll get a
+ // disconnect callback soon enough
+ h.unwatchWrite();
+ break;
+ } else if (errno == EAGAIN) {
+ // We have just put a buffer back so we know
+ // we can carry on watching for writes
+ break;
+ } else {
+ // Report error then just treat as a socket disconnect
+ QPID_LOG(error, "Error writing socket: " << qpid::sys::strError(errno) << "(" << errno << ")" );
+ h.unwatchWrite();
+ break;
+ }
+ }
+ } else {
+ int64_t duration = Duration(writeStartTime, AbsTime::now());
+ (void) duration; // force duration to be used if no probes are compiled
+
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ QPID_PROBE4(asynchio_write_finished_closed, &h, duration, total, writeCalls);
+ break;
+ }
+ // Fd is writable, but nothing to write
+ if (idleCallback) {
+ writePending = false;
+ idleCallback(*this);
+ }
+ // If we still have no buffers to write we can't do anything more
+ if (writeQueue.empty() && !writePending && !queuedClose) {
+ h.unwatchWrite();
+ // The following handles the case where writePending is
+ // set to true after the test above; in this case its
+ // possible that the unwatchWrite overwrites the
+ // desired rewatchWrite so we correct that here
+ if (writePending)
+ h.rewatchWrite();
+ QPID_PROBE4(asynchio_write_finished_nodata, &h, duration, total, writeCalls);
+ break;
+ }
+ }
+ } while (true);
+
+ ++threadWriteCount;
+ return;
+}
+
+void AsynchIO::disconnected(DispatchHandle& h) {
+ // If we have not already queued close then call disconnected callback before closing
+ if (!queuedClose && disCallback) disCallback(*this);
+ close(h);
+}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(DispatchHandle& h) {
+ h.stopWatch();
+ socket.close();
+ if (closedCallback) {
+ closedCallback(*this, socket);
+ }
+}
+
+SecuritySettings AsynchIO::getSecuritySettings() {
+ SecuritySettings settings;
+ settings.ssf = socket.getKeyLen();
+ settings.authid = socket.getClientAuthId();
+ return settings;
+}
+
+} // namespace posix
+
+AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
+ Callback callback)
+{
+ return new posix::AsynchAcceptor(s, callback);
+}
+
+AsynchConnector* AsynchConnector::create(const Socket& s,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new posix::AsynchConnector(s, hostname, port, connCb, failCb);
+}
+
+AsynchIO* AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp b/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp
new file mode 100644
index 0000000000..7c31b13ae9
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/posix/BSDSocket.h"
+
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/errno.h>
+#include <unistd.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <cstdlib>
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+std::string getName(int fd, bool local)
+{
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ if (local) {
+ QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
+ } else {
+ QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) );
+ }
+
+ return SocketAddress::asString(name, namelen);
+}
+
+uint16_t getLocalPort(int fd)
+{
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
+
+ return SocketAddress::getPort(name);
+}
+}
+
+BSDSocket::BSDSocket() :
+ fd(-1),
+ handle(new IOHandle),
+ nonblocking(false),
+ nodelay(false)
+{}
+
+Socket* createSocket()
+{
+ return new BSDSocket;
+}
+
+BSDSocket::BSDSocket(int fd0) :
+ fd(fd0),
+ handle(new IOHandle(fd)),
+ nonblocking(false),
+ nodelay(false)
+{}
+
+BSDSocket::~BSDSocket()
+{}
+
+BSDSocket::operator const IOHandle&() const
+{
+ return *handle;
+}
+
+void BSDSocket::createSocket(const SocketAddress& sa) const
+{
+ int& socket = fd;
+ if (socket != -1) BSDSocket::close();
+ int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ socket = s;
+ *handle = IOHandle(s);
+
+ try {
+ if (nonblocking) setNonblocking();
+ if (nodelay) setTcpNoDelay();
+ if (getAddrInfo(sa).ai_family == AF_INET6) {
+ int flag = 1;
+ int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
+ } catch (std::exception&) {
+ ::close(s);
+ socket = -1;
+ *handle = IOHandle();
+ throw;
+ }
+}
+
+void BSDSocket::setNonblocking() const {
+ int& socket = fd;
+ nonblocking = true;
+ if (socket != -1) {
+ QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+ }
+}
+
+void BSDSocket::setTcpNoDelay() const
+{
+ int& socket = fd;
+ nodelay = true;
+ if (socket != -1) {
+ int flag = 1;
+ int result = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
+}
+
+void BSDSocket::connect(const SocketAddress& addr) const
+{
+ // The display name for an outbound connection needs to be the name that was specified
+ // for the address rather than a resolved IP address as we don't know which of
+ // the IP addresses is actually the one that will be connected to.
+ peername = addr.asString(false);
+
+ // However the string we compare with the local port must be numeric or it might not
+ // match when it should as getLocalAddress() will always be numeric
+ std::string connectname = addr.asString();
+
+ createSocket(addr);
+
+ const int& socket = fd;
+ // TODO the correct thing to do here is loop on failure until you've used all the returned addresses
+ if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
+ (errno != EINPROGRESS)) {
+ throw Exception(QPID_MSG(strError(errno) << ": " << peername));
+ }
+ // When connecting to a port on the same host which no longer has
+ // a process associated with it, the OS occasionally chooses the
+ // remote port (which is unoccupied) as the port to bind the local
+ // end of the socket, resulting in a "circular" connection.
+ //
+ // Raise an error if we see such a connection, since we know there is
+ // no listener on the peer address.
+ //
+ if (getLocalAddress() == connectname) {
+ close();
+ throw Exception(QPID_MSG("Connection refused: " << peername));
+ }
+}
+
+void BSDSocket::finishConnect(const SocketAddress&) const
+{
+}
+
+void
+BSDSocket::close() const
+{
+ int& socket = fd;
+ if (socket == -1) return;
+ if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
+ socket = -1;
+ *handle = IOHandle();
+}
+
+int BSDSocket::listen(const SocketAddress& sa, int backlog) const
+{
+ createSocket(sa);
+
+ const int& socket = fd;
+ int yes=1;
+ QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+
+ if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
+ throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
+ if (::listen(socket, backlog) < 0)
+ throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
+
+ return getLocalPort(socket);
+}
+
+Socket* BSDSocket::accept() const
+{
+ int afd = ::accept(fd, 0, 0);
+ if ( afd >= 0) {
+ BSDSocket* s = new BSDSocket(afd);
+ s->localname = localname;
+ return s;
+ }
+ else if (errno == EAGAIN)
+ return 0;
+ else throw QPID_POSIX_ERROR(errno);
+}
+
+int BSDSocket::read(void *buf, size_t count) const
+{
+ return ::read(fd, buf, count);
+}
+
+int BSDSocket::write(const void *buf, size_t count) const
+{
+ return ::write(fd, buf, count);
+}
+
+std::string BSDSocket::getPeerAddress() const
+{
+ if (peername.empty()) {
+ peername = getName(fd, false);
+ }
+ return peername;
+}
+
+std::string BSDSocket::getLocalAddress() const
+{
+ if (localname.empty()) {
+ localname = getName(fd, true);
+ }
+ return localname;
+}
+
+int BSDSocket::getError() const
+{
+ int result;
+ socklen_t rSize = sizeof (result);
+
+ if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
+ throw QPID_POSIX_ERROR(errno);
+
+ return result;
+}
+
+int BSDSocket::getKeyLen() const
+{
+ return 0;
+}
+
+std::string BSDSocket::getClientAuthId() const
+{
+ return std::string();
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/BSDSocket.h b/qpid/cpp/src/qpid/sys/posix/BSDSocket.h
new file mode 100644
index 0000000000..ae73718d55
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/BSDSocket.h
@@ -0,0 +1,113 @@
+#ifndef QPID_SYS_BSDSOCKET_H
+#define QPID_SYS_BSDSOCKET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+#include <boost/scoped_ptr.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Duration;
+class IOHandle;
+class SocketAddress;
+
+namespace ssl {
+class SslMuxSocket;
+}
+
+class QPID_COMMON_CLASS_EXTERN BSDSocket : public Socket
+{
+public:
+ /** Create a socket wrapper for descriptor. */
+ QPID_COMMON_EXTERN BSDSocket();
+
+ /** Construct socket with existing fd (posix specific and not in Socket interface) */
+ QPID_COMMON_EXTERN BSDSocket(int fd);
+
+ QPID_COMMON_EXTERN ~BSDSocket();
+
+ QPID_COMMON_EXTERN operator const IOHandle&() const;
+
+ /** Set socket non blocking */
+ QPID_COMMON_EXTERN virtual void setNonblocking() const;
+
+ QPID_COMMON_EXTERN virtual void setTcpNoDelay() const;
+
+ QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const;
+ QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const;
+
+ QPID_COMMON_EXTERN virtual void close() const;
+
+ /** Bind to a port and start listening.
+ *@return The bound port number
+ */
+ QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const;
+
+ /**
+ * Returns an address (host and port) for the remote end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ /**
+ * Returns an address (host and port) for the local end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+ /**
+ * Returns the error code stored in the socket. This may be used
+ * to determine the result of a non-blocking connect.
+ */
+ QPID_COMMON_EXTERN int getError() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ QPID_COMMON_EXTERN virtual Socket* accept() const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const;
+ QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const;
+
+ QPID_COMMON_EXTERN int getKeyLen() const;
+ QPID_COMMON_EXTERN std::string getClientAuthId() const;
+
+protected:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
+ mutable int fd;
+ mutable boost::scoped_ptr<IOHandle> handle;
+ mutable std::string localname;
+ mutable std::string peername;
+ mutable bool nonblocking;
+ mutable bool nodelay;
+};
+
+}}
+#endif /*!QPID_SYS_BSDSOCKET_H*/
diff --git a/qpid/cpp/src/qpid/sys/posix/Condition.cpp b/qpid/cpp/src/qpid/sys/posix/Condition.cpp
new file mode 100644
index 0000000000..f629e50cd7
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Condition.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Condition.h"
+
+namespace qpid {
+namespace sys {
+
+namespace {
+
+struct ClockMonotonicAttr {
+ ::pthread_condattr_t attr;
+
+ ClockMonotonicAttr() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_condattr_init(&attr));
+ QPID_POSIX_ASSERT_THROW_IF(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
+ }
+};
+
+}
+
+Condition::Condition() {
+ static ClockMonotonicAttr attr;
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, &attr.attr));
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/posix/Condition.h b/qpid/cpp/src/qpid/sys/posix/Condition.h
new file mode 100644
index 0000000000..66f95d5fc8
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Condition.h
@@ -0,0 +1,82 @@
+#ifndef _sys_posix_Condition_h
+#define _sys_posix_Condition_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+
+#include <time.h>
+#include <sys/errno.h>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A condition variable for thread synchronization.
+ */
+class Condition
+{
+ public:
+ Condition();
+ ~Condition();
+ void wait(Mutex&);
+ bool wait(Mutex&, const AbsTime& absoluteTime);
+ void notify();
+ void notifyAll();
+
+ private:
+ pthread_cond_t condition;
+};
+
+inline Condition::~Condition() {
+ QPID_POSIX_ABORT_IF(pthread_cond_destroy(&condition));
+}
+
+inline void Condition::wait(Mutex& mutex) {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex));
+}
+
+inline bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){
+ struct timespec ts;
+ toTimespec(ts, absoluteTime);
+ int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts);
+ if (status != 0) {
+ if (status == ETIMEDOUT) return false;
+ throw QPID_POSIX_ERROR(status);
+ }
+ return true;
+}
+
+inline void Condition::notify(){
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition));
+}
+
+inline void Condition::notifyAll(){
+ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition));
+}
+
+}}
+#endif /*!_sys_posix_Condition_h*/
diff --git a/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp b/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp
new file mode 100755
index 0000000000..cec580164d
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/FileSysDir.h"
+#include "qpid/sys/StrError.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <cerrno>
+#include <unistd.h>
+#include <dirent.h>
+#include <stdlib.h>
+
+namespace qpid {
+namespace sys {
+
+bool FileSysDir::exists (void) const
+{
+ const char *cpath = dirPath.c_str ();
+ struct stat s;
+ if (::stat(cpath, &s)) {
+ if (errno == ENOENT) {
+ return false;
+ }
+ throw qpid::Exception (strError(errno) +
+ ": Can't check directory: " + dirPath);
+ }
+ if (S_ISDIR(s.st_mode))
+ return true;
+ throw qpid::Exception(dirPath + " is not a directory");
+}
+
+void FileSysDir::mkdir(void)
+{
+ if (::mkdir(dirPath.c_str(), 0755))
+ throw Exception ("Can't create directory: " + dirPath);
+}
+
+void FileSysDir::forEachFile(Callback cb) const {
+
+ ::dirent** namelist;
+
+ int n = scandir(dirPath.c_str(), &namelist, 0, alphasort);
+ if (n == -1) throw Exception (strError(errno) + ": Can't scan directory: " + dirPath);
+
+ for (int i = 0; i<n; ++i) {
+ std::string fullpath = dirPath + "/" + namelist[i]->d_name;
+ // Filter out non files/stat problems etc.
+ struct ::stat s;
+ // Can't throw here without leaking memory, so just do nothing with
+ // entries for which stat() fails.
+ if (!::stat(fullpath.c_str(), &s)) {
+ if (S_ISREG(s.st_mode)) {
+ cb(fullpath);
+ }
+ }
+ ::free(namelist[i]);
+ }
+ ::free(namelist);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/Fork.cpp b/qpid/cpp/src/qpid/sys/posix/Fork.cpp
new file mode 100644
index 0000000000..a0d404a16e
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Fork.cpp
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/select.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace qpid {
+namespace sys {
+
+using namespace std;
+
+namespace {
+
+void writeStr(int fd, const std::string& str) {
+ const char* WRITE_ERR = "Error writing to parent process";
+ int size = str.size();
+ if (int(sizeof(size)) > ::write(fd, &size, sizeof(size))) throw ErrnoException(WRITE_ERR);
+ if (size > ::write(fd, str.data(), size)) throw ErrnoException(WRITE_ERR);
+}
+
+string readStr(int fd) {
+ string value;
+ const char* READ_ERR = "Error reading from forked process";
+ int size;
+ if (int(sizeof(size)) > ::read(fd, &size, sizeof(size))) throw ErrnoException(READ_ERR);
+ if (size > 0) { // Read string message
+ value.resize(size);
+ if (size > ::read(fd, const_cast<char*>(value.data()), size)) throw ErrnoException(READ_ERR);
+ }
+ return value;
+}
+
+} // namespace
+
+Fork::Fork() {}
+Fork::~Fork() {}
+
+void Fork::fork() {
+ pid_t pid = ::fork();
+ if (pid < 0) throw ErrnoException("Failed to fork the process");
+ if (pid == 0) child();
+ else parent(pid);
+}
+
+ForkWithMessage::ForkWithMessage() {
+ pipeFds[0] = pipeFds[1] = -1;
+}
+
+struct AutoCloseFd {
+ int fd;
+ AutoCloseFd(int d) : fd(d) {}
+ ~AutoCloseFd() { ::close(fd); }
+};
+
+void ForkWithMessage::fork() {
+ if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe");
+ pid_t pid = ::fork();
+ if(pid < 0) throw ErrnoException("Fork fork failed");
+ if (pid == 0) { // Child
+ AutoCloseFd ac(pipeFds[1]); // Write side.
+ ::close(pipeFds[0]); // Read side
+ try {
+ child();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error in forked child: " << e.what());
+ std::string msg = e.what();
+ if (msg.empty()) msg = " "; // Make sure we send a non-empty error string.
+ writeStr(pipeFds[1], msg);
+ }
+ }
+ else { // Parent
+ close(pipeFds[1]); // Write side.
+ AutoCloseFd ac(pipeFds[0]); // Read side
+ parent(pid);
+ }
+}
+
+string ForkWithMessage::wait(int timeout) { // parent waits for child.
+ errno = 0;
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(pipeFds[0], &fds);
+ int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
+ if(n<0) throw ErrnoException("Error waiting for fork");
+ if (n==0) throw Exception("Timed out waiting for fork");
+
+ string error = readStr(pipeFds[0]);
+ if (error.empty()) return readStr(pipeFds[0]);
+ else throw Exception("Error in forked process: " + error);
+}
+
+// Write empty error string followed by value string to pipe.
+void ForkWithMessage::ready(const string& value) { // child
+ // Write empty string for error followed by value.
+ writeStr(pipeFds[1], string()); // No error
+ writeStr(pipeFds[1], value);
+}
+
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/Fork.h b/qpid/cpp/src/qpid/sys/posix/Fork.h
new file mode 100644
index 0000000000..698c61ed30
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Fork.h
@@ -0,0 +1,82 @@
+#ifndef QPID_SYS_POSIX_FORK_H
+#define QPID_SYS_POSIX_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+#include <sys/types.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Fork the process. Call parent() in parent and child() in child.
+ */
+class Fork {
+ public:
+ Fork();
+ virtual ~Fork();
+
+ /**
+ * Fork the process.
+ * Calls parent() in the parent process, child() in the child.
+ */
+ virtual void fork();
+
+ protected:
+
+ /** Called in parent process.
+ *@child pid of child process
+ */
+ virtual void parent(pid_t child) = 0;
+
+ /** Called in child process */
+ virtual void child() = 0;
+};
+
+/**
+ * Like Fork but also allows the child to send a string message
+ * or throw an exception to the parent.
+ */
+class ForkWithMessage : public Fork {
+ public:
+ ForkWithMessage();
+ void fork();
+
+ protected:
+ /** Call from parent(): wait for child to send a value or throw exception.
+ * @timeout in seconds to wait for response.
+ * @return value passed by child to ready().
+ */
+ std::string wait(int timeout);
+
+ /** Call from child(): Send a value to the parent.
+ *@param value returned by parent call to wait().
+ */
+ void ready(const std::string& value);
+
+ private:
+ int pipeFds[2];
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!QPID_SYS_POSIX_FORK_H*/
diff --git a/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
new file mode 100644
index 0000000000..d3f502a63c
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+namespace qpid {
+namespace sys {
+
+NullIOHandle DummyIOHandle;
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
new file mode 100755
index 0000000000..9fdf83f1bd
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/LockFile.h"
+#include "qpid/sys/posix/PidFile.h"
+
+#include <string>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "qpid/sys/posix/check.h"
+
+namespace qpid {
+namespace sys {
+
+class LockFilePrivate {
+ friend class LockFile;
+ friend class PidFile;
+
+ int fd;
+
+public:
+ LockFilePrivate(int f) : fd(f) {}
+};
+
+LockFile::LockFile(const std::string& path_, bool create)
+ : path(path_), created(create) {
+
+ errno = 0;
+ int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR;
+ int fd = ::open(path.c_str(), flags, 0644);
+ if (fd < 0) throw ErrnoException("Cannot open lock file " + path, errno);
+ if (::lockf(fd, F_TLOCK, 0) < 0) {
+ ::close(fd);
+ throw ErrnoException("Cannot lock " + path, errno);
+ }
+ impl.reset(new LockFilePrivate(fd));
+}
+
+LockFile::~LockFile() {
+ if (impl) {
+ int f = impl->fd;
+ if (f >= 0) {
+ if(::lockf(f, F_ULOCK, 0)) {} // Suppress warnings about ignoring return value.
+ ::close(f);
+ impl->fd = -1;
+ }
+ }
+}
+
+int LockFile::read(void* bytes, size_t len) const {
+ if (!impl)
+ throw Exception("Lock file not open: " + path);
+
+ ssize_t rc = ::read(impl->fd, bytes, len);
+ if ((ssize_t)len > rc) {
+ throw Exception("Cannot read lock file: " + path);
+ }
+ return rc;
+}
+
+int LockFile::write(void* bytes, size_t len) const {
+ if (!impl)
+ throw Exception("Lock file not open: " + path);
+
+ ssize_t rc = ::write(impl->fd, bytes, len);
+ if ((ssize_t)len > rc) {
+ throw Exception("Cannot write lock file: " + path);
+ }
+ return rc;
+}
+
+PidFile::PidFile(const std::string& path_, bool create):
+ LockFile(path_, create)
+{}
+
+pid_t PidFile::readPid(void) const {
+ pid_t pid;
+ int desired_read = sizeof(pid_t);
+ read(&pid, desired_read);
+ return pid;
+}
+
+void PidFile::writePid(void) {
+ pid_t pid = getpid();
+ int desired_write = sizeof(pid_t);
+ write(&pid, desired_write);
+}
+
+}} /* namespace qpid::sys */
diff --git a/qpid/cpp/src/qpid/sys/posix/MemStat.cpp b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp
new file mode 100644
index 0000000000..2fbf119cab
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/MemStat.h"
+
+#include <malloc.h>
+
+void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object)
+{
+ struct mallinfo info(mallinfo());
+
+ object->set_malloc_arena(info.arena);
+ object->set_malloc_ordblks(info.ordblks);
+ object->set_malloc_hblks(info.hblks);
+ object->set_malloc_hblkhd(info.hblkhd);
+ object->set_malloc_uordblks(info.uordblks);
+ object->set_malloc_fordblks(info.fordblks);
+ object->set_malloc_keepcost(info.keepcost);
+}
+
diff --git a/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
new file mode 100644
index 0000000000..b4292aa4bc
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/MemoryMappedFile.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+namespace qpid {
+namespace sys {
+namespace {
+const std::string PAGEFILE_PREFIX("pf_");
+const std::string PATH_SEPARATOR("/");
+const std::string ESCAPE("%");
+const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.");
+std::string getFileName(const std::string& name, const std::string& dir)
+{
+ std::stringstream filename;
+ if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX;
+ size_t start = 0;
+ while (true) {
+ size_t i = name.find_first_not_of(VALID, start);
+ if (i == std::string::npos) {
+ filename << name.substr(start);
+ return filename.str();
+ } else {
+ if (i > start) filename << name.substr(start, i-start);
+ filename << ESCAPE << (int) name.at(i);
+ start = i+1;
+ }
+ }
+
+}
+}
+
+class MemoryMappedFilePrivate
+{
+ friend class MemoryMappedFile;
+ std::string path;
+ int fd;
+ MemoryMappedFilePrivate() : fd(0) {}
+};
+MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {}
+MemoryMappedFile::~MemoryMappedFile() { delete state; }
+
+void MemoryMappedFile::open(const std::string& name, const std::string& directory)
+{
+ // Ensure directory exists
+ if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) {
+ throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno)));
+ }
+
+ state->path = getFileName(name, directory);
+
+ int flags = O_CREAT | O_TRUNC | O_RDWR;
+ int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR);
+ if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
+ state->fd = fd;
+}
+
+void MemoryMappedFile::close()
+{
+ ::close(state->fd);
+ ::unlink(state->path.c_str());
+}
+
+size_t MemoryMappedFile::getPageSize()
+{
+ return ::sysconf(_SC_PAGE_SIZE);
+}
+
+char* MemoryMappedFile::map(size_t offset, size_t size)
+{
+ int protection = PROT_READ | PROT_WRITE;
+ char* region = (char*) ::mmap(0, size, protection, MAP_SHARED, state->fd, offset);
+ if (region == MAP_FAILED) {
+ throw qpid::Exception(QPID_MSG("Failed to map page into memory: " << qpid::sys::strError(errno)));
+ }
+ return region;
+
+}
+
+void MemoryMappedFile::unmap(char* region, size_t size)
+{
+ ::munmap(region, size);
+}
+
+void MemoryMappedFile::flush(char* region, size_t size)
+{
+ ::msync(region, size, MS_ASYNC);
+}
+
+void MemoryMappedFile::expand(size_t offset)
+{
+ if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) {
+ throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno)));
+ }
+}
+
+bool MemoryMappedFile::isSupported()
+{
+ return true;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/Mutex.cpp b/qpid/cpp/src/qpid/sys/posix/Mutex.cpp
new file mode 100644
index 0000000000..0e1f0d30c2
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Mutex.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Initialise a recursive mutex attr for use in creating mutexes later
+ * (we use pthread_once to make sure it is initialised exactly once)
+ */
+
+namespace {
+pthread_once_t onceControl = PTHREAD_ONCE_INIT;
+pthread_mutexattr_t mutexattr;
+
+void initMutexattr() {
+ pthread_mutexattr_init(&mutexattr);
+ pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
+}
+}
+
+const pthread_mutexattr_t* Mutex::getAttribute() {
+ pthread_once(&onceControl, initMutexattr);
+ return &mutexattr;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/Mutex.h b/qpid/cpp/src/qpid/sys/posix/Mutex.h
new file mode 100644
index 0000000000..e2b21b5a56
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Mutex.h
@@ -0,0 +1,158 @@
+#ifndef _sys_posix_Mutex_h
+#define _sys_posix_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/posix/check.h"
+
+#include <pthread.h>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Condition;
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ friend class Condition;
+ static const pthread_mutexattr_t* getAttribute();
+
+public:
+ typedef ::qpid::sys::ScopedLock<Mutex> ScopedLock;
+ typedef ::qpid::sys::ScopedUnlock<Mutex> ScopedUnlock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline bool trylock();
+
+
+protected:
+ pthread_mutex_t mutex;
+};
+
+/**
+ * RW lock.
+ */
+class RWlock : private boost::noncopyable {
+ friend class Condition;
+
+public:
+ typedef ::qpid::sys::ScopedRlock<RWlock> ScopedRlock;
+ typedef ::qpid::sys::ScopedWlock<RWlock> ScopedWlock;
+
+ inline RWlock();
+ inline ~RWlock();
+ inline void wlock(); // will write-lock
+ inline void rlock(); // will read-lock
+ inline void unlock();
+ inline void trywlock(); // will write-try
+ inline void tryrlock(); // will read-try
+
+protected:
+ pthread_rwlock_t rwlock;
+};
+
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ::qpid::sys::ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline bool trylock();
+
+ // Must be public to be a POD:
+ pthread_mutex_t mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER }
+
+void PODMutex::lock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex));
+}
+
+void PODMutex::unlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+bool PODMutex::trylock() {
+ return pthread_mutex_trylock(&mutex) == 0;
+}
+
+Mutex::Mutex() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_init(&mutex, getAttribute()));
+}
+
+Mutex::~Mutex(){
+ QPID_POSIX_ABORT_IF(pthread_mutex_destroy(&mutex));
+}
+
+void Mutex::lock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex));
+}
+
+void Mutex::unlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex));
+}
+
+bool Mutex::trylock() {
+ return pthread_mutex_trylock(&mutex) == 0;
+}
+
+
+RWlock::RWlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_init(&rwlock, NULL));
+}
+
+RWlock::~RWlock(){
+ QPID_POSIX_ABORT_IF(pthread_rwlock_destroy(&rwlock));
+}
+
+void RWlock::wlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_wrlock(&rwlock));
+}
+
+void RWlock::rlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_rdlock(&rwlock));
+}
+
+void RWlock::unlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_unlock(&rwlock));
+}
+
+void RWlock::trywlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
+}
+
+void RWlock::tryrlock() {
+ QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
+}
+
+
+}}
+#endif /*!_sys_posix_Mutex_h*/
diff --git a/qpid/cpp/src/qpid/sys/posix/Path.cpp b/qpid/cpp/src/qpid/sys/posix/Path.cpp
new file mode 100644
index 0000000000..063e3cfc51
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Path.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/Path.h"
+#include "qpid/sys/StrError.h"
+#include "qpid/Exception.h"
+
+#include <sys/stat.h>
+#include <errno.h>
+
+#include <sys/types.h>
+
+
+
+namespace qpid {
+namespace sys {
+
+const std::string Path::separator("/");
+
+namespace {
+// Return true for success, false for ENOENT, throw otherwise.
+bool getStat(const std::string& path, struct ::stat& s) {
+ if (::stat(path.c_str(), &s)) {
+ if (errno == ENOENT) return false;
+ throw Exception(strError(errno) + ": Invalid path: " + path);
+ }
+ return true;
+}
+
+bool isFlag(const std::string& path, unsigned long flag) {
+ struct ::stat s;
+ return getStat(path, s) && (s.st_mode & flag);
+}
+}
+
+bool Path::exists () const {
+ struct ::stat s;
+ return getStat(path, s);
+}
+
+bool Path::isFile() const { return isFlag(path, S_IFREG); }
+bool Path::isDirectory() const { return isFlag(path, S_IFDIR); }
+bool Path::isAbsolute() const { return (path.size() > 0 && path[0] == separator[0]); }
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/PidFile.h b/qpid/cpp/src/qpid/sys/posix/PidFile.h
new file mode 100644
index 0000000000..fb19d407f4
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/PidFile.h
@@ -0,0 +1,62 @@
+#ifndef _sys_PidFile_h
+#define _sys_PidFile_h
+
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/LockFile.h"
+
+#include "qpid/CommonImportExport.h"
+#include "qpid/sys/IntegerTypes.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
+#include <string>
+
+namespace qpid {
+namespace sys {
+
+class PidFile : public LockFile
+{
+public:
+ QPID_COMMON_EXTERN PidFile(const std::string& path_, bool create);
+
+ /**
+ * Read the process ID from the lock file. This method assumes that
+ * if there is a process ID in the file, it was written there by
+ * writePid(); thus, it's at the start of the file.
+ *
+ * Throws an exception if there is an error reading the file.
+ *
+ * @returns The stored process ID. No validity check is done on it.
+ */
+ QPID_COMMON_EXTERN pid_t readPid(void) const;
+
+ /**
+ * Write the current process's ID to the lock file. It's written at
+ * the start of the file and will overwrite any other content that
+ * may be in the file.
+ *
+ * Throws an exception if the write fails.
+ */
+ QPID_COMMON_EXTERN void writePid(void);
+};
+
+}} /* namespace qpid::sys */
+
+#endif /*!_sys_PidFile_h*/
diff --git a/qpid/cpp/src/qpid/sys/posix/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/posix/PipeHandle.cpp
new file mode 100755
index 0000000000..4b19783338
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/PipeHandle.cpp
@@ -0,0 +1,64 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/sys/PipeHandle.h"
+#include "qpid/sys/posix/check.h"
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+
+namespace qpid {
+namespace sys {
+
+PipeHandle::PipeHandle(bool nonBlocking) {
+
+ int pair[2];
+ pair[0] = pair[1] = -1;
+
+ if (socketpair(PF_UNIX, SOCK_STREAM, 0, pair) == -1)
+ throw qpid::Exception(QPID_MSG("Creation of pipe failed"));
+
+ writeFd = pair[0];
+ readFd = pair[1];
+
+ // Set the socket to non-blocking
+ if (nonBlocking) {
+ int flags = fcntl(readFd, F_GETFL);
+ fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
+ }
+}
+
+PipeHandle::~PipeHandle() {
+ close(readFd);
+ close(writeFd);
+}
+
+int PipeHandle::read(void* buf, size_t bufSize) {
+ return ::read(readFd,buf,bufSize);
+}
+
+int PipeHandle::write(const void* buf, size_t bufSize) {
+ return ::write(writeFd,buf,bufSize);
+}
+
+int PipeHandle::getReadHandle() {
+ return readFd;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
new file mode 100644
index 0000000000..aa129faf20
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/DispatchHandle.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/Exception.h"
+
+#include <boost/bind.hpp>
+
+#include <unistd.h>
+#include <fcntl.h>
+
+namespace qpid {
+namespace sys {
+
+class PollableConditionPrivate : public sys::IOHandle {
+ friend class PollableCondition;
+
+private:
+ PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
+ sys::PollableCondition& parent,
+ const boost::shared_ptr<sys::Poller>& poller);
+ ~PollableConditionPrivate();
+
+ void dispatch(sys::DispatchHandle& h);
+ void set();
+ void clear();
+
+private:
+ PollableCondition::Callback cb;
+ PollableCondition& parent;
+ boost::shared_ptr<sys::Poller> poller;
+ int writeFd;
+ std::auto_ptr<DispatchHandleRef> handle;
+};
+
+PollableConditionPrivate::PollableConditionPrivate(
+ const sys::PollableCondition::Callback& cb,
+ sys::PollableCondition& parent,
+ const boost::shared_ptr<sys::Poller>& poller
+) : cb(cb), parent(parent)
+{
+ int fds[2];
+ if (::pipe(fds) == -1)
+ throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
+ fd = fds[0];
+ writeFd = fds[1];
+ if (::fcntl(fd, F_SETFL, O_NONBLOCK) == -1)
+ throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
+ if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1)
+ throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
+ handle.reset (new DispatchHandleRef(
+ *this,
+ boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1),
+ 0, 0));
+ handle->startWatch(poller);
+ handle->unwatch();
+
+ // Make the read FD readable
+ static const char dummy=0;
+ ssize_t n = ::write(writeFd, &dummy, 1);
+ if (n == -1 && errno != EAGAIN)
+ throw ErrnoException("Error setting PollableCondition");
+}
+
+PollableConditionPrivate::~PollableConditionPrivate() {
+ handle->stopWatch();
+ close(writeFd);
+}
+
+void PollableConditionPrivate::dispatch(sys::DispatchHandle&) {
+ cb(parent);
+}
+
+void PollableConditionPrivate::set() {
+ handle->rewatch();
+}
+
+void PollableConditionPrivate::clear() {
+ handle->unwatch();
+}
+
+
+PollableCondition::PollableCondition(const Callback& cb,
+ const boost::shared_ptr<sys::Poller>& poller
+) : impl(new PollableConditionPrivate(cb, *this, poller))
+{
+}
+
+PollableCondition::~PollableCondition()
+{
+ delete impl;
+}
+
+void PollableCondition::set() { impl->set(); }
+
+void PollableCondition::clear() { impl->clear(); }
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp
new file mode 100644
index 0000000000..ae839b2e20
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp
@@ -0,0 +1,793 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicCount.h"
+#include "qpid/sys/DeletionManager.h"
+#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Condition.h"
+
+#include <poll.h>
+#include <errno.h>
+#include <signal.h>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+#include <exception>
+
+/*
+ *
+ * This is a qpid::sys::Poller implementation for Posix systems.
+ *
+ * This module follows the structure of the Linux EpollPoller as closely as possible
+ * to simplify maintainability. Noteworthy differences:
+ *
+ * The Linux epoll_xxx() calls present one event at a time to multiple callers whereas poll()
+ * returns one or more events to a single caller. The EventStream class layers a
+ * "one event per call" view of the poll() result to multiple threads.
+ *
+ * The HandleSet is the master set of in-use PollerHandles. The EventStream
+ * maintains a snapshot copy taken just before the call to poll() that remains static
+ * until all flagged events have been processed.
+ *
+ * There is an additional window where the PollerHandlePrivate class may survive the
+ * parent PollerHandle destructor, i.e. between snapshots.
+ *
+ * Safe interrupting of the Poller is implemented using the "self-pipe trick".
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used
+DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager;
+
+// Instantiate (and define) class static for DeletionManager
+template <>
+DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0);
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerPrivate;
+ friend class PollerHandle;
+ friend class HandleSet;
+
+ enum FDStat {
+ ABSENT,
+ MONITORED,
+ INACTIVE,
+ HUNGUP,
+ MONITORED_HUNGUP,
+ INTERRUPTED,
+ INTERRUPTED_HUNGUP,
+ DELETED
+ };
+
+ short events;
+ const IOHandle* ioHandle;
+ PollerHandle* pollerHandle;
+ FDStat stat;
+ Mutex lock;
+
+ PollerHandlePrivate(const IOHandle* h, PollerHandle* p) :
+ events(0),
+ ioHandle(h),
+ pollerHandle(p),
+ stat(ABSENT) {
+ }
+
+ int fd() const {
+ return ioHandle->fd;
+ }
+
+ bool isActive() const {
+ return stat == MONITORED || stat == MONITORED_HUNGUP;
+ }
+
+ void setActive() {
+ stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
+ ? MONITORED_HUNGUP
+ : MONITORED;
+ }
+
+ bool isInactive() const {
+ return stat == INACTIVE || stat == HUNGUP;
+ }
+
+ void setInactive() {
+ stat = INACTIVE;
+ }
+
+ bool isIdle() const {
+ return stat == ABSENT;
+ }
+
+ void setIdle() {
+ stat = ABSENT;
+ }
+
+ bool isHungup() const {
+ return
+ stat == MONITORED_HUNGUP ||
+ stat == HUNGUP ||
+ stat == INTERRUPTED_HUNGUP;
+ }
+
+ void setHungup() {
+ assert(stat == MONITORED);
+ stat = HUNGUP;
+ }
+
+ bool isInterrupted() const {
+ return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
+ }
+
+ void setInterrupted() {
+ stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
+ ? INTERRUPTED_HUNGUP
+ : INTERRUPTED;
+ }
+
+ bool isDeleted() const {
+ return stat == DELETED;
+ }
+
+ void setDeleted() {
+ stat = DELETED;
+ }
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(&h, this))
+{}
+
+PollerHandle::~PollerHandle() {
+ {
+ ScopedLock<Mutex> l(impl->lock);
+ if (impl->isDeleted()) {
+ return;
+ }
+ impl->pollerHandle = 0;
+ if (impl->isInterrupted()) {
+ impl->setDeleted();
+ return;
+ }
+ assert(impl->isIdle());
+ impl->setDeleted();
+ }
+ PollerHandleDeletionManager.markForDeletion(impl);
+}
+
+class HandleSet
+{
+ Mutex lock;
+ bool stale;
+ std::set<PollerHandlePrivate*> handles;
+ public:
+ HandleSet() : stale(true) {}
+ void add(PollerHandlePrivate*);
+ void remove(PollerHandlePrivate*);
+ void cleanup();
+ bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct ::pollfd>&);
+ void setStale();
+};
+
+void HandleSet::add(PollerHandlePrivate* h)
+{
+ ScopedLock<Mutex> l(lock);
+ handles.insert(h);
+}
+void HandleSet::remove(PollerHandlePrivate* h)
+{
+ ScopedLock<Mutex> l(lock);
+ handles.erase(h);
+}
+void HandleSet::cleanup()
+{
+ // Inform all registered handles of disconnection
+ std::set<PollerHandlePrivate*> copy;
+ handles.swap(copy);
+ for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != copy.end(); ++i) {
+ PollerHandlePrivate& eh = **i;
+ {
+ ScopedLock<Mutex> l(eh.lock);
+ if (!eh.isDeleted()) {
+ Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED);
+ event.process();
+ }
+ }
+ }
+}
+void HandleSet::setStale()
+{
+ // invalidate cached pollfds for next snapshot
+ ScopedLock<Mutex> l(lock);
+ stale = true;
+}
+
+/**
+ * Concrete implementation of Poller to use Posix poll()
+ * interface
+ */
+class PollerPrivate {
+ friend class Poller;
+ friend class EventStream;
+ friend class HandleSet;
+
+ class SignalPipe {
+ /**
+ * Used to wakeup a thread in ::poll()
+ */
+ int fds[2];
+ bool signaled;
+ bool permanent;
+ Mutex lock;
+ public:
+ SignalPipe() : signaled(false), permanent(false) {
+ QPID_POSIX_CHECK(::pipe(fds));
+ }
+
+ ~SignalPipe() {
+ ::close(fds[0]);
+ ::close(fds[1]);
+ }
+
+ int getFD() {
+ return fds[0];
+ }
+
+ bool isSet() {
+ return signaled;
+ }
+
+ void set() {
+ ScopedLock<Mutex> l(lock);
+ if (signaled)
+ return;
+ signaled = true;
+ QPID_POSIX_CHECK(::write(fds[1], " ", 1));
+ }
+
+ void reset() {
+ if (permanent)
+ return;
+ ScopedLock<Mutex> l(lock);
+ if (signaled) {
+ char ignore;
+ QPID_POSIX_CHECK(::read(fds[0], &ignore, 1));
+ signaled = false;
+ }
+ }
+
+ void setPermanently() {
+ // async signal safe calls only. No locking.
+ permanent = true;
+ signaled = true;
+ QPID_POSIX_CHECK(::write(fds[1], " ", 2));
+ // poll() should never block now
+ }
+ };
+
+ // Collect pending events and serialize access. Maintain array of pollfd structs.
+ class EventStream {
+ typedef Poller::Event Event;
+ PollerPrivate& pollerPrivate;
+ SignalPipe& signalPipe;
+ std::queue<PollerHandlePrivate*> interruptedHandles;
+ std::vector<struct ::pollfd> pollfds;
+ std::vector<PollerHandlePrivate*> pollHandles;
+ Mutex streamLock;
+ Mutex serializeLock;
+ Condition serializer;
+ bool busy;
+ int currentPollfd;
+ int pollCount;
+ int waiters;
+
+ public:
+
+ EventStream(PollerPrivate* p) : pollerPrivate(*p), signalPipe(p->signalPipe), busy(false),
+ currentPollfd(0), pollCount(0), waiters(0) {
+ // The signal pipe is the first element of pollfds and pollHandles
+ pollfds.reserve(8);
+ pollfds.resize(1);
+ pollfds[0].fd = pollerPrivate.signalPipe.getFD();
+ pollfds[0].events = POLLIN;
+ pollfds[0].revents = 0;
+
+ pollHandles.reserve(8);
+ pollHandles.resize(1);
+ pollHandles[0] = 0;
+ }
+
+ void addInterrupt(PollerHandle& handle) {
+ ScopedLock<Mutex> l(streamLock);
+ interruptedHandles.push(handle.impl);
+ }
+
+ // Serialize access to the stream.
+ Event next(Duration timeout) {
+ AbsTime targetTimeout =
+ (timeout == TIME_INFINITE) ?
+ FAR_FUTURE :
+ AbsTime(now(), timeout);
+
+
+ ScopedLock<Mutex> l(serializeLock);
+ Event event(0, Poller::INVALID);
+ while (busy) {
+ waiters++;
+ bool timedout = !serializer.wait(serializeLock, targetTimeout);
+ waiters--;
+
+ if (busy && timedout) {
+ return Event(0, Poller::TIMEOUT);
+ }
+ }
+ busy = true;
+ {
+ ScopedUnlock<Mutex> ul(serializeLock);
+ event = getEvent(targetTimeout);
+ }
+ busy = false;
+
+ if (waiters > 0)
+ serializer.notify();
+ return event;
+ }
+
+ Event getEvent(AbsTime targetTimeout) {
+ bool timeoutPending = false;
+
+ ScopedLock<Mutex> l(streamLock); // hold lock except for poll()
+
+ // loop until poll event, async interrupt, or timeout
+ while (true) {
+
+ // first check for any interrupts
+ while (interruptedHandles.size() > 0) {
+ PollerHandlePrivate& eh = *interruptedHandles.front();
+ interruptedHandles.pop();
+ {
+ ScopedLock<Mutex> lk(eh.lock);
+ if (!eh.isDeleted()) {
+ if (!eh.isIdle()) {
+ eh.setInactive();
+ }
+
+ // nullify the corresponding pollfd event, if any
+ int ehfd = eh.fd();
+ std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front
+ for (; i != pollfds.end(); i++) {
+ if (i->fd == ehfd) {
+ i->events = 0;
+ if (i->revents) {
+ i->revents = 0;
+ pollCount--;
+ }
+ break;
+ }
+ }
+ return Event(eh.pollerHandle, Poller::INTERRUPTED);
+ }
+ }
+ PollerHandleDeletionManager.markForDeletion(&eh);
+ }
+
+ // Check for shutdown
+ if (pollerPrivate.isShutdown) {
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ return Event(0, Poller::SHUTDOWN);
+ }
+
+ // search for any remaining events from earlier poll()
+ int nfds = pollfds.size();
+ while ((pollCount > 0) && (currentPollfd < nfds)) {
+ int index = currentPollfd++;
+ short evt = pollfds[index].revents;
+ if (evt != 0) {
+ pollCount--;
+ PollerHandlePrivate& eh = *pollHandles[index];
+ ScopedLock<Mutex> l(eh.lock);
+ // stop polling this handle until resetMode()
+ pollfds[index].events = 0;
+
+ // the handle could have gone inactive since snapshot taken
+ if (eh.isActive()) {
+ PollerHandle* handle = eh.pollerHandle;
+ assert(handle);
+
+ // If the connection has been hungup we could still be readable
+ // (just not writable), allow us to readable until we get here again
+ if (evt & POLLHUP) {
+ if (eh.isHungup()) {
+ eh.setInactive();
+ // Don't set up last Handle so that we don't reset this handle
+ // on re-entering Poller::wait. This means that we will never
+ // be set active again once we've returned disconnected, and so
+ // can never be returned again.
+ return Event(handle, Poller::DISCONNECTED);
+ }
+ eh.setHungup();
+ } else {
+ eh.setInactive();
+ }
+ return Event(handle, PollerPrivate::epollToDirection(evt));
+ }
+ }
+ }
+
+ if (timeoutPending) {
+ return Event(0, Poller::TIMEOUT);
+ }
+
+ // no outstanding events, poll() for more
+ {
+ ScopedUnlock<Mutex> ul(streamLock);
+
+ bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds);
+ if (refreshed) {
+ // we just drained all interruptedHandles and got a fresh snapshot
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ }
+
+ if (!signalPipe.isSet()) {
+ int timeoutMs = -1;
+ if (!(targetTimeout == FAR_FUTURE)) {
+ timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC;
+ if (timeoutMs < 0)
+ timeoutMs = 0;
+ }
+
+ pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs);
+
+ if (pollCount ==-1 && errno != EINTR) {
+ QPID_POSIX_CHECK(pollCount);
+ }
+ else if (pollCount == 0) {
+ // timeout, unless shutdown or interrupt arrives in another thread
+ timeoutPending = true;
+ }
+ else {
+ if (pollfds[0].revents) {
+ pollCount--; // signal pipe doesn't count
+ }
+ }
+ }
+ else
+ pollCount = 0;
+ signalPipe.reset();
+ }
+ currentPollfd = 1;
+ }
+ }
+ };
+
+ bool isShutdown;
+ HandleSet registeredHandles;
+ AtomicCount threadCount;
+ SignalPipe signalPipe;
+ EventStream eventStream;
+
+ static short directionToEpollEvent(Poller::Direction dir) {
+ switch (dir) {
+ case Poller::INPUT: return POLLIN;
+ case Poller::OUTPUT: return POLLOUT;
+ case Poller::INOUT: return POLLIN | POLLOUT;
+ default: return 0;
+ }
+ }
+
+ static Poller::EventType epollToDirection(short events) {
+ // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs
+ // can give you both!
+ events = (events & POLLHUP) ? events & ~POLLOUT : events;
+ short e = events & (POLLIN | POLLOUT);
+ switch (e) {
+ case POLLIN: return Poller::READABLE;
+ case POLLOUT: return Poller::WRITABLE;
+ case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
+ default:
+ return (events & (POLLHUP | POLLERR)) ?
+ Poller::DISCONNECTED : Poller::INVALID;
+ }
+ }
+
+ PollerPrivate() :
+ isShutdown(false), eventStream(this) {
+ }
+
+ ~PollerPrivate() {}
+
+ void resetMode(PollerHandlePrivate& handle);
+
+ void interrupt() {
+ signalPipe.set();
+ }
+
+ void interruptAll() {
+ // be async signal safe
+ signalPipe.setPermanently();
+ }
+};
+
+
+void Poller::registerHandle(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(eh.isIdle());
+
+ eh.setActive();
+ impl->registeredHandles.add(handle.impl);
+ // not stale until monitored
+}
+
+void Poller::unregisterHandle(PollerHandle& handle) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isIdle());
+
+ eh.setIdle();
+ impl->registeredHandles.remove(handle.impl);
+ impl->registeredHandles.setStale();
+ impl->interrupt();
+}
+
+void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
+ PollerHandle* ph;
+ {
+ // Called after an event has been processed for a handle
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isActive());
+
+ if (eh.isIdle() || eh.isDeleted()) {
+ return;
+ }
+
+ if (eh.events==0) {
+ eh.setActive();
+ return;
+ }
+
+ if (!eh.isInterrupted()) {
+ // Handle still in use, allow events to resume.
+ eh.setActive();
+ registeredHandles.setStale();
+ // Ouch. This scales poorly for large handle sets.
+ // TODO: avoid new snapshot, perhaps create an index to pollfds or a
+ // pending reset queue to be processed before each poll(). However, the real
+ // scalable solution is to implement the OS-specific epoll equivalent.
+ interrupt();
+ return;
+ }
+ ph = eh.pollerHandle;
+ }
+
+ eventStream.addInterrupt(*ph);
+ interrupt();
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isIdle());
+
+ short oldEvents = eh.events;
+ eh.events |= PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
+ // tell polling thread to update its pollfds
+ impl->registeredHandles.setStale();
+ impl->interrupt();
+}
+
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isIdle());
+
+ short oldEvents = eh.events;
+ eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
+ impl->registeredHandles.setStale();
+ impl->interrupt();
+}
+
+void Poller::shutdown() {
+ // NB: this function must be async-signal safe, it must not
+ // call any function that is not async-signal safe.
+
+ // Allow sloppy code to shut us down more than once
+ if (impl->isShutdown)
+ return;
+
+ // Don't use any locking here - isShutdown will be visible to all
+ // after the write() anyway (it's a memory barrier)
+ impl->isShutdown = true;
+
+ impl->interruptAll();
+}
+
+bool Poller::interrupt(PollerHandle& handle) {
+ {
+ PollerHandlePrivate& eh = *handle.impl;
+ ScopedLock<Mutex> l(eh.lock);
+ if (eh.isIdle() || eh.isDeleted()) {
+ return false;
+ }
+
+ if (eh.isInterrupted()) {
+ return true;
+ }
+
+ if (eh.isInactive()) {
+ eh.setInterrupted();
+ return true;
+ }
+ eh.setInterrupted();
+ eh.events = 0;
+ }
+
+ impl->registeredHandles.setStale();
+ impl->eventStream.addInterrupt(handle);
+ impl->interrupt();
+ return true;
+}
+
+void Poller::run() {
+ // Ensure that we exit thread responsibly under all circumstances
+ try {
+ // Make sure we can't be interrupted by signals at a bad time
+ ::sigset_t ss;
+ ::sigfillset(&ss);
+ ::pthread_sigmask(SIG_SETMASK, &ss, 0);
+
+ ++(impl->threadCount);
+ do {
+ Event event = wait();
+
+ // If can read/write then dispatch appropriate callbacks
+ if (event.handle) {
+ event.process();
+ } else {
+ // Handle shutdown
+ switch (event.type) {
+ case SHUTDOWN:
+ //last thread to respond to shutdown cleans up:
+ if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup();
+ PollerHandleDeletionManager.destroyThreadState();
+ return;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ }
+ } while (true);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what());
+ }
+ PollerHandleDeletionManager.destroyThreadState();
+ --(impl->threadCount);
+}
+
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+ static __thread PollerHandlePrivate* lastReturnedHandle = 0;
+
+ if (lastReturnedHandle) {
+ impl->resetMode(*lastReturnedHandle);
+ lastReturnedHandle = 0;
+ }
+
+ Event event = impl->eventStream.next(timeout);
+
+ switch (event.type) {
+ case INTERRUPTED:
+ case READABLE:
+ case WRITABLE:
+ case READ_WRITABLE:
+ lastReturnedHandle = event.handle->impl;
+ break;
+ default:
+ ;
+ }
+
+ return event;
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+
+bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , std::vector<struct ::pollfd>& fds)
+{
+ // Element 0 of the vectors is always the signal pipe, leave undisturbed
+ {
+ ScopedLock<Mutex> l(lock);
+ if (!stale)
+ return false; // no refresh done
+
+ hs.resize(1);
+ for (std::set<PollerHandlePrivate*>::const_iterator i = handles.begin(); i != handles.end(); ++i) {
+ hs.push_back(*i);
+ }
+ stale = false;
+ // have copy of handle set (in vector form), drop the lock and build the pollfds
+ }
+
+ // sync pollfds to same sizing as the handles
+ int sz = hs.size();
+ fds.resize(sz);
+
+ for (int j = 1; j < sz; ++j) {
+ // create a pollfd entry for each handle
+ struct ::pollfd& pollfd = fds[j];
+ PollerHandlePrivate& eh = *hs[j];
+ ScopedLock<Mutex> lk(eh.lock);
+
+ if (!eh.isInactive() && !eh.isDeleted()) {
+ pollfd.fd = eh.fd();
+ pollfd.events = eh.events;
+ } else {
+ pollfd.fd = -1; // tell poll() to ignore this fd
+ pollfd.events = 0;
+ }
+ }
+ return true;
+}
+
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
new file mode 100644
index 0000000000..34a2022694
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
@@ -0,0 +1,65 @@
+#ifndef _sys_posix_PrivatePosix_h
+#define _sys_posix_PrivatePosix_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+
+struct timespec;
+struct timeval;
+struct addrinfo;
+
+namespace qpid {
+namespace sys {
+
+// Private Time related implementation details
+struct timespec& toTimespec(struct timespec& ts, const AbsTime& t);
+struct timeval& toTimeval(struct timeval& tv, const Duration& t);
+Duration toTime(const struct timespec& ts);
+
+// Private SocketAddress details
+class SocketAddress;
+const struct addrinfo& getAddrInfo(const SocketAddress&);
+
+// Posix fd as an IOHandle
+class IOHandle {
+public:
+ IOHandle(int fd0 = -1) :
+ fd(fd0)
+ {}
+
+ int fd;
+};
+
+// Dummy IOHandle for places it's required in the API
+// but we promise not to actually try to do any operations on the IOHandle
+class NullIOHandle : public IOHandle {
+public:
+ NullIOHandle()
+ {}
+};
+
+extern NullIOHandle DummyIOHandle;
+
+}}
+
+#endif /*!_sys_posix_PrivatePosix_h*/
diff --git a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
new file mode 100644
index 0000000000..3fb685d5b8
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Shlib.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include <dlfcn.h>
+
+
+namespace qpid {
+namespace sys {
+
+void Shlib::load(const char* name) {
+ ::dlerror();
+ handle = ::dlopen(name, RTLD_NOW);
+ const char* error = ::dlerror();
+ if (error) {
+ throw Exception(QPID_MSG(error << ": " << name));
+ }
+}
+
+void Shlib::unload() {
+ if (handle) {
+ ::dlerror();
+ ::dlclose(handle);
+ const char* error = ::dlerror();
+ if (error) {
+ throw Exception(QPID_MSG(error));
+ }
+ handle = 0;
+ }
+}
+
+void* Shlib::getSymbol(const char* name) {
+ ::dlerror();
+ void* sym = ::dlsym(handle, name);
+ const char* error = ::dlerror();
+ if (error)
+ throw Exception(QPID_MSG(error << ": " << name));
+ return sym;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
new file mode 100644
index 0000000000..4c860a7ef7
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
@@ -0,0 +1,353 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include "qpid/log/Logger.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <string.h>
+#include <arpa/inet.h>
+#include <iosfwd>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+ host(host0),
+ port(port0),
+ addrInfo(0),
+ currentAddrInfo(0)
+{
+}
+
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+ host(sa.host),
+ port(sa.port),
+ addrInfo(0),
+ currentAddrInfo(0)
+{
+}
+
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+ SocketAddress temp(sa);
+
+ std::swap(temp, *this);
+ return *this;
+}
+
+SocketAddress::~SocketAddress()
+{
+ if (addrInfo) {
+ ::freeaddrinfo(addrInfo);
+ }
+}
+
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen, bool dispNameOnly, bool hideDecoration)
+{
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6:
+ if (!hideDecoration) {
+ s += "["; s += dispName; s+= "]";
+ } else {
+ s += dispName;
+ }
+ break;
+ case AF_UNIX: s += "UNIX:"; break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ if (!dispNameOnly) {
+ s += ":";
+ s += servName;
+ }
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((const ::sockaddr_in*)(const void*)addr)->sin_port);
+ case AF_INET6: return ntohs(((const ::sockaddr_in6*)(const void*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric, bool dispNameOnly, bool hideDecoration) const
+{
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen, dispNameOnly, hideDecoration);
+}
+
+std::string SocketAddress::getHost() const
+{
+ return host;
+}
+
+/**
+ * Return true if this SocketAddress is IPv4 or IPv6
+ */
+bool SocketAddress::isIp() const
+{
+ const ::addrinfo& ai = getAddrInfo(*this);
+ return ai.ai_family == AF_INET || ai.ai_family == AF_INET6;
+}
+
+/**
+ * this represents the low address of an ACL address range.
+ * Given rangeHi that represents the high address,
+ * return a string showing the numeric comparisons that the
+ * inRange checks will do for address pair.
+ */
+std::string SocketAddress::comparisonDetails(const SocketAddress& rangeHi) const
+{
+ std::ostringstream os;
+ SocketAddress thisSa(*this);
+ SocketAddress rangeHiSa(rangeHi);
+ (void) getAddrInfo(thisSa);
+ (void) getAddrInfo(rangeHiSa);
+ os << "(" << thisSa.asString(true, true, false) <<
+ "," << rangeHiSa.asString(true, true, false) << ")";
+ while (thisSa.nextAddress()) {
+ if (!rangeHiSa.nextAddress()) {
+ throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() +
+ rangeHi.asString())));
+ }
+ os << ",(" << thisSa.asString(true, true, false) <<
+ "," << rangeHiSa.asString(true, true, false) << ")";
+ }
+ if (rangeHiSa.nextAddress()) {
+ throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() +
+ rangeHi.asString())));
+ }
+ std::string result = os.str();
+ return result;
+}
+
+/**
+ * For ACL address matching make sure that the two addresses, *this
+ * which is the low address and hiPeer which is the high address, are
+ * both numeric ip addresses of the same family and that hi > *this.
+ *
+ * Note that if the addresses resolve to more than one struct addrinfo
+ * then this and the hiPeer must be equal. This avoids having to do
+ * difficult range checks where the this and hiPeer both resolve to
+ * multiple IPv4 or IPv6 addresses.
+ *
+ * This check is run at acl file load time and not at run tme.
+ */
+bool SocketAddress::isComparable(const SocketAddress& hiPeer) const {
+ try {
+ // May only compare if this socket is IPv4 or IPv6
+ SocketAddress lo(*this);
+ const ::addrinfo& peerLoInfo = getAddrInfo(lo);
+ if (!(peerLoInfo.ai_family == AF_INET || peerLoInfo.ai_family == AF_INET6)) {
+ return false;
+ }
+ try {
+ // May only compare if peer socket is same family
+ SocketAddress hi(hiPeer);
+ const ::addrinfo& peerHiInfo = getAddrInfo(hi);
+ if (peerLoInfo.ai_family != peerHiInfo.ai_family) {
+ return false;
+ }
+ // Host names that resolve to lists are allowed if they are equal.
+ // For example: localhost, or fjord.lab.example.com
+ if ((*this).asString() == hiPeer.asString()) {
+ return true;
+ }
+ // May only compare if this and peer resolve to single address.
+ if (lo.nextAddress() || hi.nextAddress()) {
+ return false;
+ }
+ // Make sure that the lo/hi relationship is ok
+ int res;
+ if (!compareAddresses(peerLoInfo, peerHiInfo, res) || res < 0) {
+ return false;
+ }
+ return true;
+ } catch (Exception) {
+ // failed to resolve hi
+ return false;
+ }
+ } catch (Exception) {
+ // failed to resolve lo
+ return false;
+ }
+}
+
+/**
+ * *this SocketAddress was created from the numeric IP address of a
+ * connecting host.
+ * The lo and hi addresses are the limit checks from the ACL file.
+ * Return true if this address is in range of any of the address pairs
+ * in the limit check range.
+ *
+ * This check is executed on every incoming connection.
+ */
+bool SocketAddress::inRange(const SocketAddress& lo,
+ const SocketAddress& hi) const
+{
+ (*this).firstAddress();
+ lo.firstAddress();
+ hi.firstAddress();
+ const ::addrinfo& thisInfo = getAddrInfo(*this);
+ const ::addrinfo& loInfo = getAddrInfo(lo);
+ const ::addrinfo& hiInfo = getAddrInfo(hi);
+ if (inRange(thisInfo, loInfo, hiInfo)) {
+ return true;
+ }
+ while (lo.nextAddress()) {
+ if (!hi.nextAddress()) {
+ assert (false);
+ throw(Exception(QPID_MSG("Comparison iteration fails: " +
+ lo.asString() + hi.asString())));
+ }
+ const ::addrinfo& loInfo = getAddrInfo(lo);
+ const ::addrinfo& hiInfo = getAddrInfo(hi);
+ if (inRange(thisInfo, loInfo, hiInfo)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * *this SocketAddress was created from the numeric IP address of a
+ * connecting host.
+ * The lo and hi addresses are one binary address pair from a range
+ * given in an ACL file.
+ * Return true if this binary address is '>= lo' and '<= hi'.
+ */
+bool SocketAddress::inRange(const ::addrinfo& thisInfo,
+ const ::addrinfo& lo,
+ const ::addrinfo& hi) const
+{
+ int resLo;
+ int resHi;
+ if (!compareAddresses(lo, thisInfo, resLo)) {
+ return false;
+ }
+ if (!compareAddresses(hi, thisInfo, resHi)) {
+ return false;
+ }
+ if (resLo < 0) {
+ return false;
+ }
+ if (resHi > 0) {
+ return false;
+ }
+ return true;
+}
+
+/**
+ * Compare this address against two binary low/high addresses.
+ * return true with result holding the comparison.
+ */
+bool SocketAddress::compareAddresses(const struct addrinfo& lo,
+ const struct addrinfo& hi,
+ int& result) const
+{
+ if (lo.ai_family != hi.ai_family) {
+ return false;
+ }
+ if (lo.ai_family == AF_INET) {
+ void* taddr;
+
+ taddr = (void*)lo.ai_addr;
+ struct sockaddr_in* sin4lo = (struct sockaddr_in*)taddr;
+ taddr = (void*)hi.ai_addr;
+ struct sockaddr_in* sin4hi = (struct sockaddr_in*)taddr;
+ result = memcmp(&sin4hi->sin_addr, &sin4lo->sin_addr, sizeof(in_addr));
+ } else if (lo.ai_family == AF_INET6) {
+ void* taddr;
+
+ taddr = (void*)lo.ai_addr;
+ struct sockaddr_in6* sin6lo = (struct sockaddr_in6*)taddr;
+ taddr = (void*)hi.ai_addr;
+ struct sockaddr_in6* sin6hi = (struct sockaddr_in6*)taddr;
+ result = memcmp(&sin6hi->sin6_addr, &sin6lo->sin6_addr, sizeof(in6_addr));
+ } else {
+ assert (false);
+ return false;
+ }
+ return true;
+}
+
+void SocketAddress::firstAddress() const {
+ if (addrInfo) {
+ currentAddrInfo = addrInfo;
+ } else {
+ (void) getAddrInfo(*this);
+ }
+}
+
+bool SocketAddress::nextAddress() const {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+ if (!sa.addrInfo) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (sa.host.empty()) {
+ hints.ai_flags = AI_PASSIVE;
+ } else {
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ node = sa.host.c_str();
+ }
+ const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
+ }
+
+ return *sa.currentAddrInfo;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/posix/StrError.cpp b/qpid/cpp/src/qpid/sys/posix/StrError.cpp
new file mode 100644
index 0000000000..633e20213c
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/StrError.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/StrError.h"
+
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+std::string strError(int err) {
+ char buf[512] = "Unknown error";
+#ifdef _GNU_SOURCE
+ // GNU strerror_r returns the message
+ return ::strerror_r(err, buf, sizeof(buf));
+#else
+ // POSIX strerror_r doesn't return the buffer
+ ::strerror_r(err, buf, sizeof(buf));
+ return std::string(buf);
+#endif
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp
new file mode 100755
index 0000000000..2a42a5b2a7
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/posix/check.h"
+#include <arpa/inet.h>
+#include <sys/ioctl.h>
+#include <sys/utsname.h>
+#include <sys/types.h> // For FreeBSD
+#include <sys/socket.h> // For FreeBSD
+#include <netinet/in.h> // For FreeBSD
+#include <ifaddrs.h>
+#include <unistd.h>
+#include <iostream>
+#include <fstream>
+#include <sstream>
+#include <map>
+#include <netdb.h>
+#include <string.h>
+
+#ifndef HOST_NAME_MAX
+# define HOST_NAME_MAX 256
+#endif
+
+using namespace std;
+
+namespace qpid {
+namespace sys {
+
+long SystemInfo::concurrency() {
+#ifdef _SC_NPROCESSORS_ONLN // Linux specific.
+ return sysconf(_SC_NPROCESSORS_ONLN);
+#else
+ return -1;
+#endif
+}
+
+bool SystemInfo::getLocalHostname (Address &address) {
+ char name[HOST_NAME_MAX];
+ if (::gethostname(name, sizeof(name)) != 0)
+ return false;
+ address.host = name;
+ return true;
+}
+
+static const string LOOPBACK("127.0.0.1");
+static const string TCP("tcp");
+
+// Test IPv4 address for loopback
+inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) {
+ return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000);
+}
+
+inline bool isLoopback(const ::sockaddr* addr) {
+ switch (addr->sa_family) {
+ case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr);
+ case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr);
+ default: return false;
+ }
+}
+
+namespace {
+ inline socklen_t sa_len(::sockaddr* sa)
+ {
+ switch (sa->sa_family) {
+ case AF_INET:
+ return sizeof(struct sockaddr_in);
+ case AF_INET6:
+ return sizeof(struct sockaddr_in6);
+ default:
+ return sizeof(struct sockaddr_storage);
+ }
+ }
+
+ inline bool isInetOrInet6(::sockaddr* sa) {
+ switch (sa->sa_family) {
+ case AF_INET:
+ case AF_INET6:
+ return true;
+ default:
+ return false;
+ }
+ }
+ typedef std::map<std::string, std::vector<std::string> > InterfaceInfo;
+ std::map<std::string, std::vector<std::string> > cachedInterfaces;
+
+ void cacheInterfaceInfo() {
+ // Get interface info
+ ::ifaddrs* interfaceInfo;
+ QPID_POSIX_CHECK( ::getifaddrs(&interfaceInfo) );
+
+ char name[NI_MAXHOST];
+ for (::ifaddrs* info = interfaceInfo; info != 0; info = info->ifa_next) {
+
+ // Only use IPv4/IPv6 interfaces
+ if (!info->ifa_addr || !isInetOrInet6(info->ifa_addr)) continue;
+
+ int rc=::getnameinfo(info->ifa_addr, sa_len(info->ifa_addr),
+ name, sizeof(name), 0, 0,
+ NI_NUMERICHOST);
+ if (rc >= 0) {
+ std::string address(name);
+ cachedInterfaces[info->ifa_name].push_back(address);
+ } else {
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ }
+ }
+ ::freeifaddrs(interfaceInfo);
+ }
+}
+
+bool SystemInfo::getInterfaceAddresses(const std::string& interface, std::vector<std::string>& addresses) {
+ if ( cachedInterfaces.empty() ) cacheInterfaceInfo();
+ InterfaceInfo::iterator i = cachedInterfaces.find(interface);
+ if ( i==cachedInterfaces.end() ) return false;
+ std::copy(i->second.begin(), i->second.end(), std::back_inserter(addresses));
+ return true;
+}
+
+void SystemInfo::getInterfaceNames(std::vector<std::string>& names ) {
+ if ( cachedInterfaces.empty() ) cacheInterfaceInfo();
+
+ for (InterfaceInfo::const_iterator i = cachedInterfaces.begin(); i!=cachedInterfaces.end(); ++i) {
+ names.push_back(i->first);
+ }
+}
+
+void SystemInfo::getSystemId (std::string &osName,
+ std::string &nodeName,
+ std::string &release,
+ std::string &version,
+ std::string &machine)
+{
+ struct utsname _uname;
+ if (uname (&_uname) == 0)
+ {
+ osName = _uname.sysname;
+ nodeName = _uname.nodename;
+ release = _uname.release;
+ version = _uname.version;
+ machine = _uname.machine;
+ }
+}
+
+uint32_t SystemInfo::getProcessId()
+{
+ return (uint32_t) ::getpid();
+}
+
+uint32_t SystemInfo::getParentProcessId()
+{
+ return (uint32_t) ::getppid();
+}
+
+// Linux specific (Solaris has quite different stuff in /proc)
+string SystemInfo::getProcessName()
+{
+ string value;
+
+ ifstream input("/proc/self/status");
+ if (input.good()) {
+ while (!input.eof()) {
+ string key;
+ input >> key;
+ if (key == "Name:") {
+ input >> value;
+ break;
+ }
+ }
+ input.close();
+ }
+
+ return value;
+}
+
+// Always true. Only Windows has exception cases.
+bool SystemInfo::threadSafeShutdown()
+{
+ return true;
+}
+
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/Thread.cpp b/qpid/cpp/src/qpid/sys/posix/Thread.cpp
new file mode 100644
index 0000000000..349e35d643
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Thread.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Thread.h"
+
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/posix/check.h"
+
+#include <pthread.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+void* runRunnable(void* p)
+{
+ static_cast<Runnable*>(p)->run();
+ return 0;
+}
+}
+
+class ThreadPrivate {
+public:
+ pthread_t thread;
+
+ ThreadPrivate(Runnable* runnable) {
+ QPID_POSIX_ASSERT_THROW_IF(::pthread_create(&thread, NULL, runRunnable, runnable));
+ }
+
+ ThreadPrivate() : thread(::pthread_self()) {}
+};
+
+Thread::Thread() {}
+
+Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+
+Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+
+Thread::operator bool() {
+ return !!impl;
+}
+
+bool Thread::operator==(const Thread& t) const {
+ return pthread_equal(impl->thread, t.impl->thread) != 0;
+}
+
+bool Thread::operator!=(const Thread& t) const {
+ return !(*this==t);
+}
+
+void Thread::join(){
+ if (impl) {
+ QPID_POSIX_ASSERT_THROW_IF(::pthread_join(impl->thread, 0));
+ }
+}
+
+unsigned long Thread::logId() {
+ // This does need to be the C cast operator as
+ // pthread_t could be either a pointer or an integer
+ // and so we can't know static_cast<> or reinterpret_cast<>
+ return (unsigned long) ::pthread_self();
+}
+
+Thread Thread::current() {
+ Thread t;
+ t.impl.reset(new ThreadPrivate());
+ return t;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp
new file mode 100644
index 0000000000..10a5d944b1
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include "qpid/sys/Time.h"
+#include <ostream>
+#include <istream>
+#include <sstream>
+#include <time.h>
+#include <stdio.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <iomanip>
+#include <cctype>
+
+namespace {
+int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); }
+}
+
+namespace qpid {
+namespace sys {
+
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) :
+ timepoint(d == Duration::max() ? max_abstime() : t.timepoint+d.nanosecs)
+{}
+
+AbsTime AbsTime::Zero() {
+ AbsTime epoch; epoch.timepoint = 0;
+ return epoch;
+}
+
+AbsTime AbsTime::FarFuture() {
+ AbsTime ff; ff.timepoint = max_abstime(); return ff;
+}
+
+AbsTime AbsTime::now() {
+ struct timespec ts;
+ ::clock_gettime(CLOCK_MONOTONIC, &ts);
+ AbsTime time_now;
+ time_now.timepoint = toTime(ts).nanosecs;
+ return time_now;
+}
+
+AbsTime AbsTime::epoch() {
+ return AbsTime(now(), -Duration::FromEpoch());
+}
+
+Duration Duration::FromEpoch() {
+ struct timespec ts;
+ ::clock_gettime(CLOCK_REALTIME, &ts);
+ return toTime(ts).nanosecs;
+}
+
+Duration::Duration(const AbsTime& start, const AbsTime& finish) :
+ nanosecs(finish.timepoint - start.timepoint)
+{}
+
+namespace {
+/** type conversion helper: an infinite timeout for time_t sized types **/
+const time_t TIME_T_MAX = std::numeric_limits<time_t>::max();
+}
+
+struct timespec& toTimespec(struct timespec& ts, const AbsTime& a) {
+ Duration t(ZERO, a);
+ Duration secs = t / TIME_SEC;
+ ts.tv_sec = (secs > TIME_T_MAX) ? TIME_T_MAX : static_cast<time_t>(secs);
+ ts.tv_nsec = static_cast<long>(t % TIME_SEC);
+ return ts;
+}
+
+Duration toTime(const struct timespec& ts) {
+ return ts.tv_sec*TIME_SEC + ts.tv_nsec;
+}
+
+std::ostream& operator<<(std::ostream& o, const Duration& d) {
+ if (d >= TIME_SEC) return o << (double(d)/TIME_SEC) << "s";
+ if (d >= TIME_MSEC) return o << (double(d)/TIME_MSEC) << "ms";
+ if (d >= TIME_USEC) return o << (double(d)/TIME_USEC) << "us";
+ return o << int64_t(d) << "ns";
+}
+
+std::istream& operator>>(std::istream& i, Duration& d) {
+ // Don't throw, let the istream throw if it's configured to do so.
+ double number;
+ i >> number;
+ if (i.fail()) return i;
+
+ if (i.eof() || std::isspace(i.peek())) // No suffix
+ d = int64_t(number*TIME_SEC);
+ else {
+ std::stringbuf suffix;
+ i >> &suffix;
+ if (i.fail()) return i;
+ std::string suffix_str = suffix.str();
+ if (suffix_str.compare("s") == 0) d = int64_t(number*TIME_SEC);
+ else if (suffix_str.compare("ms") == 0) d = int64_t(number*TIME_MSEC);
+ else if (suffix_str.compare("us") == 0) d = int64_t(number*TIME_USEC);
+ else if (suffix_str.compare("ns") == 0) d = int64_t(number*TIME_NSEC);
+ else i.setstate(std::ios::failbit);
+ }
+ return i;
+}
+
+namespace {
+inline std::ostream& outputFormattedTime(std::ostream& o, const ::time_t* time) {
+ ::tm timeinfo;
+ char time_string[100];
+ ::strftime(time_string, 100,
+ "%Y-%m-%d %H:%M:%S",
+ localtime_r(time, &timeinfo));
+ return o << time_string;
+}
+}
+
+std::ostream& operator<<(std::ostream& o, const AbsTime& t) {
+ ::time_t rawtime(t.timepoint/TIME_SEC);
+ return outputFormattedTime(o, &rawtime);
+}
+
+void outputFormattedNow(std::ostream& o) {
+ ::time_t rawtime;
+ ::time(&rawtime);
+ outputFormattedTime(o, &rawtime);
+ o << " ";
+}
+
+void outputHiresNow(std::ostream& o) {
+ ::timespec time;
+ ::clock_gettime(CLOCK_REALTIME, &time);
+ ::time_t seconds = time.tv_sec;
+ outputFormattedTime(o, &seconds);
+ o << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << " ";
+}
+
+void sleep(int secs) {
+ ::sleep(secs);
+}
+
+void usleep(uint64_t usecs) {
+ ::usleep(usecs);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/posix/Time.h b/qpid/cpp/src/qpid/sys/posix/Time.h
new file mode 100755
index 0000000000..62d734c816
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/Time.h
@@ -0,0 +1,34 @@
+#ifndef QPID_SYS_POSIX_TIME_H
+#define QPID_SYS_POSIX_TIME_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Class to represent an instant in time.
+ */
+typedef int64_t TimePrivate;
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_POSIX_TIME_H*/
diff --git a/qpid/cpp/src/qpid/sys/posix/check.h b/qpid/cpp/src/qpid/sys/posix/check.h
new file mode 100644
index 0000000000..1bfe5d6d78
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/posix/check.h
@@ -0,0 +1,53 @@
+#ifndef _posix_check_h
+#define _posix_check_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+
+#include <cerrno>
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRNO)))
+
+/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+#define QPID_POSIX_CHECK(RESULT) \
+ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))
+
+/** Throw a posix error if ERRNO is non-zero */
+#define QPID_POSIX_THROW_IF(ERRNO) \
+ do { int e=(ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0)
+
+/** Same as _THROW_IF in a release build, but abort a debug build */
+#ifdef NDEBUG
+#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) QPID_POSIX_THROW_IF(ERRNO)
+#else
+#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) \
+ do { int e=(ERRNO); if (e) { errno=e; ::perror(0); assert(0); } } while(0)
+#endif
+
+#define QPID_POSIX_ABORT_IF(ERRNO) if ((int) ERRNO) { errno=ERRNO; ::perror(0); abort(); }
+
+#endif /*!_posix_check_h*/