diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/posix')
28 files changed, 4107 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp new file mode 100644 index 0000000000..7d04d2214d --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -0,0 +1,655 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Probes.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +// TODO The basic algorithm here is not really POSIX specific and with a +// bit more abstraction could (should) be promoted to be platform portable +// - The POSIX specific code here is ignoring SIGPIPE which should really +// be part of the socket code. +// - And checking errno to detect specific read/write conditions. +// +#include <errno.h> +#include <signal.h> + +#include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/shared_array.hpp> + +namespace qpid { +namespace sys { +namespace posix { + +namespace { + +struct StaticInit { + StaticInit() { + /** + * Make *process* not generate SIGPIPE when writing to closed + * pipe/socket (necessary as default action is to terminate process) + */ + ::signal(SIGPIPE, SIG_IGN); + }; +} init; + +/* + * We keep per thread state to avoid locking overhead. The assumption is that + * on average all the connections are serviced by all the threads so the state + * recorded in each thread is about the same. If this turns out not to be the + * case we could rebalance the info occasionally. + */ +__thread int threadReadTotal = 0; +__thread int threadReadCount = 0; +__thread int threadWriteTotal = 0; +__thread int threadWriteCount = 0; +__thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms +} + +/* + * Asynch Acceptor + */ +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { +public: + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); + void start(Poller::shared_ptr poller); + +private: + void readable(DispatchHandle& handle); + +private: + AsynchAcceptor::Callback acceptedCallback; + DispatchHandle handle; + const Socket& socket; + +}; + +AsynchAcceptor::AsynchAcceptor(const Socket& s, + AsynchAcceptor::Callback callback) : + acceptedCallback(callback), + handle((const IOHandle&)s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + socket(s) { + + s.setNonblocking(); +} + +AsynchAcceptor::~AsynchAcceptor() { + handle.stopWatch(); +} + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + handle.startWatch(poller); +} + +/* + * We keep on accepting as long as there is something to accept + */ +void AsynchAcceptor::readable(DispatchHandle& h) { + Socket* s; + do { + errno = 0; + // TODO: Currently we ignore the peers address, perhaps we should + // log it or use it for connection acceptance. + try { + s = socket.accept(); + if (s) { + acceptedCallback(*s); + } else { + break; + } + } catch (const std::exception& e) { + QPID_LOG(error, "Could not accept socket: " << e.what()); + break; + } + } while (true); + + h.rewatch(); +} + +/* + * POSIX version of AsynchIO TCP socket connector. + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. + */ +class AsynchConnector : public qpid::sys::AsynchConnector, + private DispatchHandle { + +private: + void connComplete(DispatchHandle& handle); + void requestedCall(RequestCallback rCb); + +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + SocketAddress sa; + +public: + AsynchConnector(const Socket& socket, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb); + void start(Poller::shared_ptr poller); + void stop(); + void requestCallback(RequestCallback rCb); +}; + +AsynchConnector::AsynchConnector(const Socket& s, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb) : + DispatchHandle((const IOHandle&)s, + 0, + boost::bind(&AsynchConnector::connComplete, this, _1), + boost::bind(&AsynchConnector::connComplete, this, _1)), + connCallback(connCb), + failCallback(failCb), + socket(s), + sa(hostname, port) +{ + socket.setNonblocking(); + + // Note, not catching any exceptions here, also has effect of destructing + QPID_LOG(info, "Connecting: " << sa.asString()); + socket.connect(sa); +} + +void AsynchConnector::start(Poller::shared_ptr poller) +{ + startWatch(poller); +} + +void AsynchConnector::stop() +{ + stopWatch(); +} + +void AsynchConnector::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchConnector::requestedCall, this, callback)); +} + +void AsynchConnector::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + +void AsynchConnector::connComplete(DispatchHandle& h) +{ + int errCode = socket.getError(); + if (errCode == 0) { + h.stopWatch(); + try { + socket.finishConnect(sa); + } catch (const std::exception& e) { + failCallback(socket, 0, e.what()); + DispatchHandle::doDelete(); + return; + } + connCallback(socket); + } else { + // Retry while we cause an immediate exception + // (asynch failure will be handled by re-entering here at the top) + while (sa.nextAddress()) { + try { + // Try next address without deleting ourselves + QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode)); + QPID_LOG(info, "Retrying connect: " << sa.asString()); + socket.connect(sa); + return; + } catch (const std::exception& e) { + QPID_LOG(debug, "Ignored socket connect exception: " << e.what()); + } + errCode = socket.getError(); + } + h.stopWatch(); + failCallback(socket, errCode, strError(errCode)); + } + DispatchHandle::doDelete(); +} + +/* + * POSIX version of AsynchIO reader/writer + * + * The class is implemented in terms of DispatchHandle to allow it to be + * deleted by deleting the contained DispatchHandle. + */ +class AsynchIO : public qpid::sys::AsynchIO, private DispatchHandle { + +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + + // Methods inherited from qpid::sys::AsynchIO + + virtual void queueForDeletion(); + + virtual void start(Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void requestCallback(RequestCallback); + virtual BufferBase* getQueuedBuffer(); + virtual SecuritySettings getSecuritySettings(); + +private: + ~AsynchIO(); + + // Methods that are callback targets from Dispatcher. + void readable(DispatchHandle& handle); + void writeable(DispatchHandle& handle); + void disconnected(DispatchHandle& handle); + void requestedCall(RequestCallback); + void close(DispatchHandle& handle); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; + bool queuedClose; + /** + * This flag is used to detect and handle concurrency between + * calls to notifyPendingWrite() (which can be made from any thread) and + * the execution of the writeable() method (which is always on the + * thread processing this handle. + */ + volatile bool writePending; +}; + +AsynchIO::AsynchIO(const Socket& s, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : + + DispatchHandle((const IOHandle&)s, + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), + readCallback(rCb), + eofCallback(eofCb), + disCallback(disCb), + closedCallback(cCb), + emptyCallback(eCb), + idleCallback(iCb), + socket(s), + queuedClose(false), + writePending(false) { + + s.setNonblocking(); +} + +AsynchIO::~AsynchIO() { +} + +void AsynchIO::queueForDeletion() { + DispatchHandle::doDelete(); +} + +void AsynchIO::start(Poller::shared_ptr poller) { + DispatchHandle::startWatch(poller); +} + +void AsynchIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + +void AsynchIO::queueReadBuffer(BufferBase* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + + bool queueWasEmpty = bufferQueue.empty(); + bufferQueue.push_back(buff); + if (queueWasEmpty) + DispatchHandle::rewatchRead(); +} + +void AsynchIO::unread(BufferBase* buff) { + assert(buff); + buff->squish(); + + bool queueWasEmpty = bufferQueue.empty(); + bufferQueue.push_front(buff); + if (queueWasEmpty) + DispatchHandle::rewatchRead(); +} + +void AsynchIO::queueWrite(BufferBase* buff) { + assert(buff); + // If we've already closed the socket then throw the write away + if (queuedClose) { + queueReadBuffer(buff); + return; + } else { + writeQueue.push_front(buff); + } + writePending = false; + DispatchHandle::rewatchWrite(); +} + +// This can happen outside the callback context +void AsynchIO::notifyPendingWrite() { + writePending = true; + DispatchHandle::rewatchWrite(); +} + +void AsynchIO::queueWriteClose() { + queuedClose = true; + DispatchHandle::rewatchWrite(); +} + +bool AsynchIO::writeQueueEmpty() { + return writeQueue.empty(); +} + +void AsynchIO::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchIO::requestedCall, this, callback)); +} + +void AsynchIO::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + +/** Return a queued buffer if there are enough + * to spare + */ +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { + BufferBase* buff = bufferQueue.empty() ? 0 : bufferQueue.back(); + // An "unread" buffer is reserved for future read operations (which + // take from the front of the queue). + if (!buff || (buff->dataCount && bufferQueue.size() == 1)) { + QPID_LOG(error, "No IO buffers available"); + return 0; + } + assert(buff->dataCount == 0); + bufferQueue.pop_back(); + return buff; +} + +/* + * We keep on reading as long as we have something to read, a buffer + * to put it in and reading is not stopped by flow control. + */ +void AsynchIO::readable(DispatchHandle& h) { + AbsTime readStartTime = AbsTime::now(); + size_t total = 0; + int readCalls = 0; + do { + // (Try to) get a buffer + if (!bufferQueue.empty()) { + // Read into buffer + BufferBase* buff = bufferQueue.front(); + assert(buff); + bufferQueue.pop_front(); + errno = 0; + int readCount = buff->byteCount-buff->dataCount; + int rc = socket.read(buff->bytes + buff->dataCount, readCount); + ++readCalls; + if (rc > 0) { + buff->dataCount += rc; + threadReadTotal += rc; + total += rc; + + readCallback(*this, buff); + int64_t duration = Duration(readStartTime, AbsTime::now()); + if (rc != readCount) { + // If we didn't fill the read buffer then time to stop reading + QPID_PROBE4(asynchio_read_finished_done, &h, duration, total, readCalls); + break; + } + + // Stop reading if we've overrun our timeslot + if ( duration > threadMaxIoTimeNs) { + QPID_PROBE4(asynchio_read_finished_maxtime, &h, duration, total, readCalls); + break; + } + + } else { + // Put buffer back (at front so it doesn't interfere with unread buffers) + bufferQueue.push_front(buff); + assert(buff); + + QPID_PROBE5(asynchio_read_finished_error, &h, Duration(readStartTime, AbsTime::now()), total, readCalls, errno); + // Eof or other side has gone away + if (rc == 0 || errno == ECONNRESET) { + eofCallback(*this); + h.unwatchRead(); + break; + } else if (errno == EAGAIN) { + // We have just put a buffer back so we know + // we can carry on watching for reads + break; + } else { + // Report error then just treat as a socket disconnect + QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); + eofCallback(*this); + h.unwatchRead(); + break; + } + } + } else { + // Something to read but no buffer + if (emptyCallback) { + emptyCallback(*this); + } + // If we still have no buffers we can't do anything more + if (bufferQueue.empty()) { + h.unwatchRead(); + QPID_PROBE4(asynchio_read_finished_nobuffers, &h, Duration(readStartTime, AbsTime::now()), total, readCalls); + break; + } + + } + } while (true); + + ++threadReadCount; + return; +} + +/* + * We carry on writing whilst we have data to write and we can write + */ +void AsynchIO::writeable(DispatchHandle& h) { + AbsTime writeStartTime = AbsTime::now(); + size_t total = 0; + int writeCalls = 0; + do { + // See if we've got something to write + if (!writeQueue.empty()) { + // Write buffer + BufferBase* buff = writeQueue.back(); + writeQueue.pop_back(); + errno = 0; + assert(buff->dataStart+buff->dataCount <= buff->byteCount); + int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); + int64_t duration = Duration(writeStartTime, AbsTime::now()); + ++writeCalls; + if (rc >= 0) { + threadWriteTotal += rc; + total += rc; + + // If we didn't write full buffer put rest back + if (rc != buff->dataCount) { + buff->dataStart += rc; + buff->dataCount -= rc; + writeQueue.push_back(buff); + QPID_PROBE4(asynchio_write_finished_done, &h, duration, total, writeCalls); + break; + } + + // Recycle the buffer + queueReadBuffer(buff); + + // Stop writing if we've overrun our timeslot + if (duration > threadMaxIoTimeNs) { + QPID_PROBE4(asynchio_write_finished_maxtime, &h, duration, total, writeCalls); + break; + } + } else { + // Put buffer back + writeQueue.push_back(buff); + QPID_PROBE5(asynchio_write_finished_error, &h, duration, total, writeCalls, errno); + + if (errno == ECONNRESET || errno == EPIPE) { + // Just stop watching for write here - we'll get a + // disconnect callback soon enough + h.unwatchWrite(); + break; + } else if (errno == EAGAIN) { + // We have just put a buffer back so we know + // we can carry on watching for writes + break; + } else { + // Report error then just treat as a socket disconnect + QPID_LOG(error, "Error writing socket: " << qpid::sys::strError(errno) << "(" << errno << ")" ); + h.unwatchWrite(); + break; + } + } + } else { + int64_t duration = Duration(writeStartTime, AbsTime::now()); + (void) duration; // force duration to be used if no probes are compiled + + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + QPID_PROBE4(asynchio_write_finished_closed, &h, duration, total, writeCalls); + break; + } + // Fd is writable, but nothing to write + if (idleCallback) { + writePending = false; + idleCallback(*this); + } + // If we still have no buffers to write we can't do anything more + if (writeQueue.empty() && !writePending && !queuedClose) { + h.unwatchWrite(); + // The following handles the case where writePending is + // set to true after the test above; in this case its + // possible that the unwatchWrite overwrites the + // desired rewatchWrite so we correct that here + if (writePending) + h.rewatchWrite(); + QPID_PROBE4(asynchio_write_finished_nodata, &h, duration, total, writeCalls); + break; + } + } + } while (true); + + ++threadWriteCount; + return; +} + +void AsynchIO::disconnected(DispatchHandle& h) { + // If we have not already queued close then call disconnected callback before closing + if (!queuedClose && disCallback) disCallback(*this); + close(h); +} + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(DispatchHandle& h) { + h.stopWatch(); + socket.close(); + if (closedCallback) { + closedCallback(*this, socket); + } +} + +SecuritySettings AsynchIO::getSecuritySettings() { + SecuritySettings settings; + settings.ssf = socket.getKeyLen(); + settings.authid = socket.getClientAuthId(); + return settings; +} + +} // namespace posix + +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new posix::AsynchAcceptor(s, callback); +} + +AsynchConnector* AsynchConnector::create(const Socket& s, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new posix::AsynchConnector(s, hostname, port, connCb, failCb); +} + +AsynchIO* AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new posix::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp b/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp new file mode 100644 index 0000000000..7c31b13ae9 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/BSDSocket.cpp @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/posix/BSDSocket.h" + +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" + +#include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/errno.h> +#include <unistd.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <netdb.h> +#include <cstdlib> +#include <string.h> + +namespace qpid { +namespace sys { + +namespace { +std::string getName(int fd, bool local) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + if (local) { + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + } else { + QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) ); + } + + return SocketAddress::asString(name, namelen); +} + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) ); + + return SocketAddress::getPort(name); +} +} + +BSDSocket::BSDSocket() : + fd(-1), + handle(new IOHandle), + nonblocking(false), + nodelay(false) +{} + +Socket* createSocket() +{ + return new BSDSocket; +} + +BSDSocket::BSDSocket(int fd0) : + fd(fd0), + handle(new IOHandle(fd)), + nonblocking(false), + nodelay(false) +{} + +BSDSocket::~BSDSocket() +{} + +BSDSocket::operator const IOHandle&() const +{ + return *handle; +} + +void BSDSocket::createSocket(const SocketAddress& sa) const +{ + int& socket = fd; + if (socket != -1) BSDSocket::close(); + int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); + if (s < 0) throw QPID_POSIX_ERROR(errno); + socket = s; + *handle = IOHandle(s); + + try { + if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); + if (getAddrInfo(sa).ai_family == AF_INET6) { + int flag = 1; + int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } + } catch (std::exception&) { + ::close(s); + socket = -1; + *handle = IOHandle(); + throw; + } +} + +void BSDSocket::setNonblocking() const { + int& socket = fd; + nonblocking = true; + if (socket != -1) { + QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); + } +} + +void BSDSocket::setTcpNoDelay() const +{ + int& socket = fd; + nodelay = true; + if (socket != -1) { + int flag = 1; + int result = ::setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + } +} + +void BSDSocket::connect(const SocketAddress& addr) const +{ + // The display name for an outbound connection needs to be the name that was specified + // for the address rather than a resolved IP address as we don't know which of + // the IP addresses is actually the one that will be connected to. + peername = addr.asString(false); + + // However the string we compare with the local port must be numeric or it might not + // match when it should as getLocalAddress() will always be numeric + std::string connectname = addr.asString(); + + createSocket(addr); + + const int& socket = fd; + // TODO the correct thing to do here is loop on failure until you've used all the returned addresses + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && + (errno != EINPROGRESS)) { + throw Exception(QPID_MSG(strError(errno) << ": " << peername)); + } + // When connecting to a port on the same host which no longer has + // a process associated with it, the OS occasionally chooses the + // remote port (which is unoccupied) as the port to bind the local + // end of the socket, resulting in a "circular" connection. + // + // Raise an error if we see such a connection, since we know there is + // no listener on the peer address. + // + if (getLocalAddress() == connectname) { + close(); + throw Exception(QPID_MSG("Connection refused: " << peername)); + } +} + +void BSDSocket::finishConnect(const SocketAddress&) const +{ +} + +void +BSDSocket::close() const +{ + int& socket = fd; + if (socket == -1) return; + if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); + socket = -1; + *handle = IOHandle(); +} + +int BSDSocket::listen(const SocketAddress& sa, int backlog) const +{ + createSocket(sa); + + const int& socket = fd; + int yes=1; + QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); + + if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) + throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); + if (::listen(socket, backlog) < 0) + throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); + + return getLocalPort(socket); +} + +Socket* BSDSocket::accept() const +{ + int afd = ::accept(fd, 0, 0); + if ( afd >= 0) { + BSDSocket* s = new BSDSocket(afd); + s->localname = localname; + return s; + } + else if (errno == EAGAIN) + return 0; + else throw QPID_POSIX_ERROR(errno); +} + +int BSDSocket::read(void *buf, size_t count) const +{ + return ::read(fd, buf, count); +} + +int BSDSocket::write(const void *buf, size_t count) const +{ + return ::write(fd, buf, count); +} + +std::string BSDSocket::getPeerAddress() const +{ + if (peername.empty()) { + peername = getName(fd, false); + } + return peername; +} + +std::string BSDSocket::getLocalAddress() const +{ + if (localname.empty()) { + localname = getName(fd, true); + } + return localname; +} + +int BSDSocket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0) + throw QPID_POSIX_ERROR(errno); + + return result; +} + +int BSDSocket::getKeyLen() const +{ + return 0; +} + +std::string BSDSocket::getClientAuthId() const +{ + return std::string(); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/BSDSocket.h b/qpid/cpp/src/qpid/sys/posix/BSDSocket.h new file mode 100644 index 0000000000..ae73718d55 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/BSDSocket.h @@ -0,0 +1,113 @@ +#ifndef QPID_SYS_BSDSOCKET_H +#define QPID_SYS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +#include <boost/scoped_ptr.hpp> + +namespace qpid { +namespace sys { + +class Duration; +class IOHandle; +class SocketAddress; + +namespace ssl { +class SslMuxSocket; +} + +class QPID_COMMON_CLASS_EXTERN BSDSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN BSDSocket(); + + /** Construct socket with existing fd (posix specific and not in Socket interface) */ + QPID_COMMON_EXTERN BSDSocket(int fd); + + QPID_COMMON_EXTERN ~BSDSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + + QPID_COMMON_EXTERN int getKeyLen() const; + QPID_COMMON_EXTERN std::string getClientAuthId() const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable int fd; + mutable boost::scoped_ptr<IOHandle> handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; +}; + +}} +#endif /*!QPID_SYS_BSDSOCKET_H*/ diff --git a/qpid/cpp/src/qpid/sys/posix/Condition.cpp b/qpid/cpp/src/qpid/sys/posix/Condition.cpp new file mode 100644 index 0000000000..f629e50cd7 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Condition.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Condition.h" + +namespace qpid { +namespace sys { + +namespace { + +struct ClockMonotonicAttr { + ::pthread_condattr_t attr; + + ClockMonotonicAttr() { + QPID_POSIX_ASSERT_THROW_IF(pthread_condattr_init(&attr)); + QPID_POSIX_ASSERT_THROW_IF(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)); + } +}; + +} + +Condition::Condition() { + static ClockMonotonicAttr attr; + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, &attr.attr)); +} + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/Condition.h b/qpid/cpp/src/qpid/sys/posix/Condition.h new file mode 100644 index 0000000000..66f95d5fc8 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Condition.h @@ -0,0 +1,82 @@ +#ifndef _sys_posix_Condition_h +#define _sys_posix_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/posix/PrivatePosix.h" + +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +#include <time.h> +#include <sys/errno.h> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition +{ + public: + Condition(); + ~Condition(); + void wait(Mutex&); + bool wait(Mutex&, const AbsTime& absoluteTime); + void notify(); + void notifyAll(); + + private: + pthread_cond_t condition; +}; + +inline Condition::~Condition() { + QPID_POSIX_ABORT_IF(pthread_cond_destroy(&condition)); +} + +inline void Condition::wait(Mutex& mutex) { + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); +} + +inline bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ + struct timespec ts; + toTimespec(ts, absoluteTime); + int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); + if (status != 0) { + if (status == ETIMEDOUT) return false; + throw QPID_POSIX_ERROR(status); + } + return true; +} + +inline void Condition::notify(){ + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition)); +} + +inline void Condition::notifyAll(){ + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition)); +} + +}} +#endif /*!_sys_posix_Condition_h*/ diff --git a/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp b/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp new file mode 100755 index 0000000000..cec580164d --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/FileSysDir.cpp @@ -0,0 +1,80 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/FileSysDir.h" +#include "qpid/sys/StrError.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <cerrno> +#include <unistd.h> +#include <dirent.h> +#include <stdlib.h> + +namespace qpid { +namespace sys { + +bool FileSysDir::exists (void) const +{ + const char *cpath = dirPath.c_str (); + struct stat s; + if (::stat(cpath, &s)) { + if (errno == ENOENT) { + return false; + } + throw qpid::Exception (strError(errno) + + ": Can't check directory: " + dirPath); + } + if (S_ISDIR(s.st_mode)) + return true; + throw qpid::Exception(dirPath + " is not a directory"); +} + +void FileSysDir::mkdir(void) +{ + if (::mkdir(dirPath.c_str(), 0755)) + throw Exception ("Can't create directory: " + dirPath); +} + +void FileSysDir::forEachFile(Callback cb) const { + + ::dirent** namelist; + + int n = scandir(dirPath.c_str(), &namelist, 0, alphasort); + if (n == -1) throw Exception (strError(errno) + ": Can't scan directory: " + dirPath); + + for (int i = 0; i<n; ++i) { + std::string fullpath = dirPath + "/" + namelist[i]->d_name; + // Filter out non files/stat problems etc. + struct ::stat s; + // Can't throw here without leaking memory, so just do nothing with + // entries for which stat() fails. + if (!::stat(fullpath.c_str(), &s)) { + if (S_ISREG(s.st_mode)) { + cb(fullpath); + } + } + ::free(namelist[i]); + } + ::free(namelist); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/Fork.cpp b/qpid/cpp/src/qpid/sys/posix/Fork.cpp new file mode 100644 index 0000000000..a0d404a16e --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Fork.cpp @@ -0,0 +1,129 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/Fork.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" + +#include <errno.h> +#include <fcntl.h> +#include <signal.h> +#include <string.h> +#include <sys/stat.h> +#include <sys/select.h> +#include <sys/types.h> +#include <unistd.h> + +namespace qpid { +namespace sys { + +using namespace std; + +namespace { + +void writeStr(int fd, const std::string& str) { + const char* WRITE_ERR = "Error writing to parent process"; + int size = str.size(); + if (int(sizeof(size)) > ::write(fd, &size, sizeof(size))) throw ErrnoException(WRITE_ERR); + if (size > ::write(fd, str.data(), size)) throw ErrnoException(WRITE_ERR); +} + +string readStr(int fd) { + string value; + const char* READ_ERR = "Error reading from forked process"; + int size; + if (int(sizeof(size)) > ::read(fd, &size, sizeof(size))) throw ErrnoException(READ_ERR); + if (size > 0) { // Read string message + value.resize(size); + if (size > ::read(fd, const_cast<char*>(value.data()), size)) throw ErrnoException(READ_ERR); + } + return value; +} + +} // namespace + +Fork::Fork() {} +Fork::~Fork() {} + +void Fork::fork() { + pid_t pid = ::fork(); + if (pid < 0) throw ErrnoException("Failed to fork the process"); + if (pid == 0) child(); + else parent(pid); +} + +ForkWithMessage::ForkWithMessage() { + pipeFds[0] = pipeFds[1] = -1; +} + +struct AutoCloseFd { + int fd; + AutoCloseFd(int d) : fd(d) {} + ~AutoCloseFd() { ::close(fd); } +}; + +void ForkWithMessage::fork() { + if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe"); + pid_t pid = ::fork(); + if(pid < 0) throw ErrnoException("Fork fork failed"); + if (pid == 0) { // Child + AutoCloseFd ac(pipeFds[1]); // Write side. + ::close(pipeFds[0]); // Read side + try { + child(); + } + catch (const std::exception& e) { + QPID_LOG(error, "Error in forked child: " << e.what()); + std::string msg = e.what(); + if (msg.empty()) msg = " "; // Make sure we send a non-empty error string. + writeStr(pipeFds[1], msg); + } + } + else { // Parent + close(pipeFds[1]); // Write side. + AutoCloseFd ac(pipeFds[0]); // Read side + parent(pid); + } +} + +string ForkWithMessage::wait(int timeout) { // parent waits for child. + errno = 0; + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + fd_set fds; + FD_ZERO(&fds); + FD_SET(pipeFds[0], &fds); + int n=select(FD_SETSIZE, &fds, 0, 0, &tv); + if(n<0) throw ErrnoException("Error waiting for fork"); + if (n==0) throw Exception("Timed out waiting for fork"); + + string error = readStr(pipeFds[0]); + if (error.empty()) return readStr(pipeFds[0]); + else throw Exception("Error in forked process: " + error); +} + +// Write empty error string followed by value string to pipe. +void ForkWithMessage::ready(const string& value) { // child + // Write empty string for error followed by value. + writeStr(pipeFds[1], string()); // No error + writeStr(pipeFds[1], value); +} + + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/Fork.h b/qpid/cpp/src/qpid/sys/posix/Fork.h new file mode 100644 index 0000000000..698c61ed30 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Fork.h @@ -0,0 +1,82 @@ +#ifndef QPID_SYS_POSIX_FORK_H +#define QPID_SYS_POSIX_FORK_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <string> +#include <sys/types.h> + +namespace qpid { +namespace sys { + +/** + * Fork the process. Call parent() in parent and child() in child. + */ +class Fork { + public: + Fork(); + virtual ~Fork(); + + /** + * Fork the process. + * Calls parent() in the parent process, child() in the child. + */ + virtual void fork(); + + protected: + + /** Called in parent process. + *@child pid of child process + */ + virtual void parent(pid_t child) = 0; + + /** Called in child process */ + virtual void child() = 0; +}; + +/** + * Like Fork but also allows the child to send a string message + * or throw an exception to the parent. + */ +class ForkWithMessage : public Fork { + public: + ForkWithMessage(); + void fork(); + + protected: + /** Call from parent(): wait for child to send a value or throw exception. + * @timeout in seconds to wait for response. + * @return value passed by child to ready(). + */ + std::string wait(int timeout); + + /** Call from child(): Send a value to the parent. + *@param value returned by parent call to wait(). + */ + void ready(const std::string& value); + + private: + int pipeFds[2]; +}; + +}} // namespace qpid::sys + + + +#endif /*!QPID_SYS_POSIX_FORK_H*/ diff --git a/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp new file mode 100644 index 0000000000..d3f502a63c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/posix/PrivatePosix.h" + +namespace qpid { +namespace sys { + +NullIOHandle DummyIOHandle; + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp new file mode 100755 index 0000000000..9fdf83f1bd --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp @@ -0,0 +1,107 @@ +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/LockFile.h" +#include "qpid/sys/posix/PidFile.h" + +#include <string> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "qpid/sys/posix/check.h" + +namespace qpid { +namespace sys { + +class LockFilePrivate { + friend class LockFile; + friend class PidFile; + + int fd; + +public: + LockFilePrivate(int f) : fd(f) {} +}; + +LockFile::LockFile(const std::string& path_, bool create) + : path(path_), created(create) { + + errno = 0; + int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR; + int fd = ::open(path.c_str(), flags, 0644); + if (fd < 0) throw ErrnoException("Cannot open lock file " + path, errno); + if (::lockf(fd, F_TLOCK, 0) < 0) { + ::close(fd); + throw ErrnoException("Cannot lock " + path, errno); + } + impl.reset(new LockFilePrivate(fd)); +} + +LockFile::~LockFile() { + if (impl) { + int f = impl->fd; + if (f >= 0) { + if(::lockf(f, F_ULOCK, 0)) {} // Suppress warnings about ignoring return value. + ::close(f); + impl->fd = -1; + } + } +} + +int LockFile::read(void* bytes, size_t len) const { + if (!impl) + throw Exception("Lock file not open: " + path); + + ssize_t rc = ::read(impl->fd, bytes, len); + if ((ssize_t)len > rc) { + throw Exception("Cannot read lock file: " + path); + } + return rc; +} + +int LockFile::write(void* bytes, size_t len) const { + if (!impl) + throw Exception("Lock file not open: " + path); + + ssize_t rc = ::write(impl->fd, bytes, len); + if ((ssize_t)len > rc) { + throw Exception("Cannot write lock file: " + path); + } + return rc; +} + +PidFile::PidFile(const std::string& path_, bool create): + LockFile(path_, create) +{} + +pid_t PidFile::readPid(void) const { + pid_t pid; + int desired_read = sizeof(pid_t); + read(&pid, desired_read); + return pid; +} + +void PidFile::writePid(void) { + pid_t pid = getpid(); + int desired_write = sizeof(pid_t); + write(&pid, desired_write); +} + +}} /* namespace qpid::sys */ diff --git a/qpid/cpp/src/qpid/sys/posix/MemStat.cpp b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp new file mode 100644 index 0000000000..2fbf119cab --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/MemStat.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/MemStat.h" + +#include <malloc.h> + +void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object) +{ + struct mallinfo info(mallinfo()); + + object->set_malloc_arena(info.arena); + object->set_malloc_ordblks(info.ordblks); + object->set_malloc_hblks(info.hblks); + object->set_malloc_hblkhd(info.hblkhd); + object->set_malloc_uordblks(info.uordblks); + object->set_malloc_fordblks(info.fordblks); + object->set_malloc_keepcost(info.keepcost); +} + diff --git a/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp new file mode 100644 index 0000000000..b4292aa4bc --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/MemoryMappedFile.h" +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include <sys/mman.h> +#include <sys/stat.h> +#include <unistd.h> +#include <fcntl.h> + +namespace qpid { +namespace sys { +namespace { +const std::string PAGEFILE_PREFIX("pf_"); +const std::string PATH_SEPARATOR("/"); +const std::string ESCAPE("%"); +const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-."); +std::string getFileName(const std::string& name, const std::string& dir) +{ + std::stringstream filename; + if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX; + size_t start = 0; + while (true) { + size_t i = name.find_first_not_of(VALID, start); + if (i == std::string::npos) { + filename << name.substr(start); + return filename.str(); + } else { + if (i > start) filename << name.substr(start, i-start); + filename << ESCAPE << (int) name.at(i); + start = i+1; + } + } + +} +} + +class MemoryMappedFilePrivate +{ + friend class MemoryMappedFile; + std::string path; + int fd; + MemoryMappedFilePrivate() : fd(0) {} +}; +MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {} +MemoryMappedFile::~MemoryMappedFile() { delete state; } + +void MemoryMappedFile::open(const std::string& name, const std::string& directory) +{ + // Ensure directory exists + if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) { + throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno))); + } + + state->path = getFileName(name, directory); + + int flags = O_CREAT | O_TRUNC | O_RDWR; + int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR); + if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]")); + state->fd = fd; +} + +void MemoryMappedFile::close() +{ + ::close(state->fd); + ::unlink(state->path.c_str()); +} + +size_t MemoryMappedFile::getPageSize() +{ + return ::sysconf(_SC_PAGE_SIZE); +} + +char* MemoryMappedFile::map(size_t offset, size_t size) +{ + int protection = PROT_READ | PROT_WRITE; + char* region = (char*) ::mmap(0, size, protection, MAP_SHARED, state->fd, offset); + if (region == MAP_FAILED) { + throw qpid::Exception(QPID_MSG("Failed to map page into memory: " << qpid::sys::strError(errno))); + } + return region; + +} + +void MemoryMappedFile::unmap(char* region, size_t size) +{ + ::munmap(region, size); +} + +void MemoryMappedFile::flush(char* region, size_t size) +{ + ::msync(region, size, MS_ASYNC); +} + +void MemoryMappedFile::expand(size_t offset) +{ + if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) { + throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno))); + } +} + +bool MemoryMappedFile::isSupported() +{ + return true; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/Mutex.cpp b/qpid/cpp/src/qpid/sys/posix/Mutex.cpp new file mode 100644 index 0000000000..0e1f0d30c2 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Mutex.cpp @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace sys { + +/** + * Initialise a recursive mutex attr for use in creating mutexes later + * (we use pthread_once to make sure it is initialised exactly once) + */ + +namespace { +pthread_once_t onceControl = PTHREAD_ONCE_INIT; +pthread_mutexattr_t mutexattr; + +void initMutexattr() { + pthread_mutexattr_init(&mutexattr); + pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE); +} +} + +const pthread_mutexattr_t* Mutex::getAttribute() { + pthread_once(&onceControl, initMutexattr); + return &mutexattr; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/Mutex.h b/qpid/cpp/src/qpid/sys/posix/Mutex.h new file mode 100644 index 0000000000..e2b21b5a56 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Mutex.h @@ -0,0 +1,158 @@ +#ifndef _sys_posix_Mutex_h +#define _sys_posix_Mutex_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/posix/check.h" + +#include <pthread.h> +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +class Condition; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + friend class Condition; + static const pthread_mutexattr_t* getAttribute(); + +public: + typedef ::qpid::sys::ScopedLock<Mutex> ScopedLock; + typedef ::qpid::sys::ScopedUnlock<Mutex> ScopedUnlock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline bool trylock(); + + +protected: + pthread_mutex_t mutex; +}; + +/** + * RW lock. + */ +class RWlock : private boost::noncopyable { + friend class Condition; + +public: + typedef ::qpid::sys::ScopedRlock<RWlock> ScopedRlock; + typedef ::qpid::sys::ScopedWlock<RWlock> ScopedWlock; + + inline RWlock(); + inline ~RWlock(); + inline void wlock(); // will write-lock + inline void rlock(); // will read-lock + inline void unlock(); + inline void trywlock(); // will write-try + inline void tryrlock(); // will read-try + +protected: + pthread_rwlock_t rwlock; +}; + + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ::qpid::sys::ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline bool trylock(); + + // Must be public to be a POD: + pthread_mutex_t mutex; +}; + +#define QPID_MUTEX_INITIALIZER { PTHREAD_MUTEX_INITIALIZER } + +void PODMutex::lock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex)); +} + +void PODMutex::unlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +bool PODMutex::trylock() { + return pthread_mutex_trylock(&mutex) == 0; +} + +Mutex::Mutex() { + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_init(&mutex, getAttribute())); +} + +Mutex::~Mutex(){ + QPID_POSIX_ABORT_IF(pthread_mutex_destroy(&mutex)); +} + +void Mutex::lock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_lock(&mutex)); +} + +void Mutex::unlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_mutex_unlock(&mutex)); +} + +bool Mutex::trylock() { + return pthread_mutex_trylock(&mutex) == 0; +} + + +RWlock::RWlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_init(&rwlock, NULL)); +} + +RWlock::~RWlock(){ + QPID_POSIX_ABORT_IF(pthread_rwlock_destroy(&rwlock)); +} + +void RWlock::wlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_wrlock(&rwlock)); +} + +void RWlock::rlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_rdlock(&rwlock)); +} + +void RWlock::unlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_unlock(&rwlock)); +} + +void RWlock::trywlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_trywrlock(&rwlock)); +} + +void RWlock::tryrlock() { + QPID_POSIX_ASSERT_THROW_IF(pthread_rwlock_tryrdlock(&rwlock)); +} + + +}} +#endif /*!_sys_posix_Mutex_h*/ diff --git a/qpid/cpp/src/qpid/sys/posix/Path.cpp b/qpid/cpp/src/qpid/sys/posix/Path.cpp new file mode 100644 index 0000000000..063e3cfc51 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Path.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/Path.h" +#include "qpid/sys/StrError.h" +#include "qpid/Exception.h" + +#include <sys/stat.h> +#include <errno.h> + +#include <sys/types.h> + + + +namespace qpid { +namespace sys { + +const std::string Path::separator("/"); + +namespace { +// Return true for success, false for ENOENT, throw otherwise. +bool getStat(const std::string& path, struct ::stat& s) { + if (::stat(path.c_str(), &s)) { + if (errno == ENOENT) return false; + throw Exception(strError(errno) + ": Invalid path: " + path); + } + return true; +} + +bool isFlag(const std::string& path, unsigned long flag) { + struct ::stat s; + return getStat(path, s) && (s.st_mode & flag); +} +} + +bool Path::exists () const { + struct ::stat s; + return getStat(path, s); +} + +bool Path::isFile() const { return isFlag(path, S_IFREG); } +bool Path::isDirectory() const { return isFlag(path, S_IFDIR); } +bool Path::isAbsolute() const { return (path.size() > 0 && path[0] == separator[0]); } + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/PidFile.h b/qpid/cpp/src/qpid/sys/posix/PidFile.h new file mode 100644 index 0000000000..fb19d407f4 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/PidFile.h @@ -0,0 +1,62 @@ +#ifndef _sys_PidFile_h +#define _sys_PidFile_h + +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/LockFile.h" + +#include "qpid/CommonImportExport.h" +#include "qpid/sys/IntegerTypes.h" + +#include <boost/noncopyable.hpp> +#include <boost/shared_ptr.hpp> +#include <string> + +namespace qpid { +namespace sys { + +class PidFile : public LockFile +{ +public: + QPID_COMMON_EXTERN PidFile(const std::string& path_, bool create); + + /** + * Read the process ID from the lock file. This method assumes that + * if there is a process ID in the file, it was written there by + * writePid(); thus, it's at the start of the file. + * + * Throws an exception if there is an error reading the file. + * + * @returns The stored process ID. No validity check is done on it. + */ + QPID_COMMON_EXTERN pid_t readPid(void) const; + + /** + * Write the current process's ID to the lock file. It's written at + * the start of the file and will overwrite any other content that + * may be in the file. + * + * Throws an exception if the write fails. + */ + QPID_COMMON_EXTERN void writePid(void); +}; + +}} /* namespace qpid::sys */ + +#endif /*!_sys_PidFile_h*/ diff --git a/qpid/cpp/src/qpid/sys/posix/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/posix/PipeHandle.cpp new file mode 100755 index 0000000000..4b19783338 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/PipeHandle.cpp @@ -0,0 +1,64 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/sys/PipeHandle.h" +#include "qpid/sys/posix/check.h" +#include <unistd.h> +#include <fcntl.h> +#include <sys/socket.h> + +namespace qpid { +namespace sys { + +PipeHandle::PipeHandle(bool nonBlocking) { + + int pair[2]; + pair[0] = pair[1] = -1; + + if (socketpair(PF_UNIX, SOCK_STREAM, 0, pair) == -1) + throw qpid::Exception(QPID_MSG("Creation of pipe failed")); + + writeFd = pair[0]; + readFd = pair[1]; + + // Set the socket to non-blocking + if (nonBlocking) { + int flags = fcntl(readFd, F_GETFL); + fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + } +} + +PipeHandle::~PipeHandle() { + close(readFd); + close(writeFd); +} + +int PipeHandle::read(void* buf, size_t bufSize) { + return ::read(readFd,buf,bufSize); +} + +int PipeHandle::write(const void* buf, size_t bufSize) { + return ::write(writeFd,buf,bufSize); +} + +int PipeHandle::getReadHandle() { + return readFd; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp new file mode 100644 index 0000000000..aa129faf20 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/Exception.h" + +#include <boost/bind.hpp> + +#include <unistd.h> +#include <fcntl.h> + +namespace qpid { +namespace sys { + +class PollableConditionPrivate : public sys::IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller); + ~PollableConditionPrivate(); + + void dispatch(sys::DispatchHandle& h); + void set(); + void clear(); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr<sys::Poller> poller; + int writeFd; + std::auto_ptr<DispatchHandleRef> handle; +}; + +PollableConditionPrivate::PollableConditionPrivate( + const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller +) : cb(cb), parent(parent) +{ + int fds[2]; + if (::pipe(fds) == -1) + throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + fd = fds[0]; + writeFd = fds[1]; + if (::fcntl(fd, F_SETFL, O_NONBLOCK) == -1) + throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) + throw ErrnoException(QPID_MSG("Can't create PollableCondition")); + handle.reset (new DispatchHandleRef( + *this, + boost::bind(&sys::PollableConditionPrivate::dispatch, this, _1), + 0, 0)); + handle->startWatch(poller); + handle->unwatch(); + + // Make the read FD readable + static const char dummy=0; + ssize_t n = ::write(writeFd, &dummy, 1); + if (n == -1 && errno != EAGAIN) + throw ErrnoException("Error setting PollableCondition"); +} + +PollableConditionPrivate::~PollableConditionPrivate() { + handle->stopWatch(); + close(writeFd); +} + +void PollableConditionPrivate::dispatch(sys::DispatchHandle&) { + cb(parent); +} + +void PollableConditionPrivate::set() { + handle->rewatch(); +} + +void PollableConditionPrivate::clear() { + handle->unwatch(); +} + + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller +) : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { impl->set(); } + +void PollableCondition::clear() { impl->clear(); } + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp new file mode 100644 index 0000000000..ae839b2e20 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/PosixPoller.cpp @@ -0,0 +1,793 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/AtomicCount.h" +#include "qpid/sys/DeletionManager.h" +#include "qpid/sys/posix/check.h" +#include "qpid/sys/posix/PrivatePosix.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Condition.h" + +#include <poll.h> +#include <errno.h> +#include <signal.h> + +#include <assert.h> +#include <queue> +#include <set> +#include <exception> + +/* + * + * This is a qpid::sys::Poller implementation for Posix systems. + * + * This module follows the structure of the Linux EpollPoller as closely as possible + * to simplify maintainability. Noteworthy differences: + * + * The Linux epoll_xxx() calls present one event at a time to multiple callers whereas poll() + * returns one or more events to a single caller. The EventStream class layers a + * "one event per call" view of the poll() result to multiple threads. + * + * The HandleSet is the master set of in-use PollerHandles. The EventStream + * maintains a snapshot copy taken just before the call to poll() that remains static + * until all flagged events have been processed. + * + * There is an additional window where the PollerHandlePrivate class may survive the + * parent PollerHandle destructor, i.e. between snapshots. + * + * Safe interrupting of the Poller is implemented using the "self-pipe trick". + * + */ + +namespace qpid { +namespace sys { + +// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used +DeletionManager<PollerHandlePrivate> PollerHandleDeletionManager; + +// Instantiate (and define) class static for DeletionManager +template <> +DeletionManager<PollerHandlePrivate>::AllThreadsStatuses DeletionManager<PollerHandlePrivate>::allThreadsStatuses(0); + +class PollerHandlePrivate { + friend class Poller; + friend class PollerPrivate; + friend class PollerHandle; + friend class HandleSet; + + enum FDStat { + ABSENT, + MONITORED, + INACTIVE, + HUNGUP, + MONITORED_HUNGUP, + INTERRUPTED, + INTERRUPTED_HUNGUP, + DELETED + }; + + short events; + const IOHandle* ioHandle; + PollerHandle* pollerHandle; + FDStat stat; + Mutex lock; + + PollerHandlePrivate(const IOHandle* h, PollerHandle* p) : + events(0), + ioHandle(h), + pollerHandle(p), + stat(ABSENT) { + } + + int fd() const { + return ioHandle->fd; + } + + bool isActive() const { + return stat == MONITORED || stat == MONITORED_HUNGUP; + } + + void setActive() { + stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP) + ? MONITORED_HUNGUP + : MONITORED; + } + + bool isInactive() const { + return stat == INACTIVE || stat == HUNGUP; + } + + void setInactive() { + stat = INACTIVE; + } + + bool isIdle() const { + return stat == ABSENT; + } + + void setIdle() { + stat = ABSENT; + } + + bool isHungup() const { + return + stat == MONITORED_HUNGUP || + stat == HUNGUP || + stat == INTERRUPTED_HUNGUP; + } + + void setHungup() { + assert(stat == MONITORED); + stat = HUNGUP; + } + + bool isInterrupted() const { + return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP; + } + + void setInterrupted() { + stat = (stat == MONITORED_HUNGUP || stat == HUNGUP) + ? INTERRUPTED_HUNGUP + : INTERRUPTED; + } + + bool isDeleted() const { + return stat == DELETED; + } + + void setDeleted() { + stat = DELETED; + } +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(&h, this)) +{} + +PollerHandle::~PollerHandle() { + { + ScopedLock<Mutex> l(impl->lock); + if (impl->isDeleted()) { + return; + } + impl->pollerHandle = 0; + if (impl->isInterrupted()) { + impl->setDeleted(); + return; + } + assert(impl->isIdle()); + impl->setDeleted(); + } + PollerHandleDeletionManager.markForDeletion(impl); +} + +class HandleSet +{ + Mutex lock; + bool stale; + std::set<PollerHandlePrivate*> handles; + public: + HandleSet() : stale(true) {} + void add(PollerHandlePrivate*); + void remove(PollerHandlePrivate*); + void cleanup(); + bool snapshot(std::vector<PollerHandlePrivate *>& , std::vector<struct ::pollfd>&); + void setStale(); +}; + +void HandleSet::add(PollerHandlePrivate* h) +{ + ScopedLock<Mutex> l(lock); + handles.insert(h); +} +void HandleSet::remove(PollerHandlePrivate* h) +{ + ScopedLock<Mutex> l(lock); + handles.erase(h); +} +void HandleSet::cleanup() +{ + // Inform all registered handles of disconnection + std::set<PollerHandlePrivate*> copy; + handles.swap(copy); + for (std::set<PollerHandlePrivate*>::const_iterator i = copy.begin(); i != copy.end(); ++i) { + PollerHandlePrivate& eh = **i; + { + ScopedLock<Mutex> l(eh.lock); + if (!eh.isDeleted()) { + Poller::Event event((*i)->pollerHandle, Poller::DISCONNECTED); + event.process(); + } + } + } +} +void HandleSet::setStale() +{ + // invalidate cached pollfds for next snapshot + ScopedLock<Mutex> l(lock); + stale = true; +} + +/** + * Concrete implementation of Poller to use Posix poll() + * interface + */ +class PollerPrivate { + friend class Poller; + friend class EventStream; + friend class HandleSet; + + class SignalPipe { + /** + * Used to wakeup a thread in ::poll() + */ + int fds[2]; + bool signaled; + bool permanent; + Mutex lock; + public: + SignalPipe() : signaled(false), permanent(false) { + QPID_POSIX_CHECK(::pipe(fds)); + } + + ~SignalPipe() { + ::close(fds[0]); + ::close(fds[1]); + } + + int getFD() { + return fds[0]; + } + + bool isSet() { + return signaled; + } + + void set() { + ScopedLock<Mutex> l(lock); + if (signaled) + return; + signaled = true; + QPID_POSIX_CHECK(::write(fds[1], " ", 1)); + } + + void reset() { + if (permanent) + return; + ScopedLock<Mutex> l(lock); + if (signaled) { + char ignore; + QPID_POSIX_CHECK(::read(fds[0], &ignore, 1)); + signaled = false; + } + } + + void setPermanently() { + // async signal safe calls only. No locking. + permanent = true; + signaled = true; + QPID_POSIX_CHECK(::write(fds[1], " ", 2)); + // poll() should never block now + } + }; + + // Collect pending events and serialize access. Maintain array of pollfd structs. + class EventStream { + typedef Poller::Event Event; + PollerPrivate& pollerPrivate; + SignalPipe& signalPipe; + std::queue<PollerHandlePrivate*> interruptedHandles; + std::vector<struct ::pollfd> pollfds; + std::vector<PollerHandlePrivate*> pollHandles; + Mutex streamLock; + Mutex serializeLock; + Condition serializer; + bool busy; + int currentPollfd; + int pollCount; + int waiters; + + public: + + EventStream(PollerPrivate* p) : pollerPrivate(*p), signalPipe(p->signalPipe), busy(false), + currentPollfd(0), pollCount(0), waiters(0) { + // The signal pipe is the first element of pollfds and pollHandles + pollfds.reserve(8); + pollfds.resize(1); + pollfds[0].fd = pollerPrivate.signalPipe.getFD(); + pollfds[0].events = POLLIN; + pollfds[0].revents = 0; + + pollHandles.reserve(8); + pollHandles.resize(1); + pollHandles[0] = 0; + } + + void addInterrupt(PollerHandle& handle) { + ScopedLock<Mutex> l(streamLock); + interruptedHandles.push(handle.impl); + } + + // Serialize access to the stream. + Event next(Duration timeout) { + AbsTime targetTimeout = + (timeout == TIME_INFINITE) ? + FAR_FUTURE : + AbsTime(now(), timeout); + + + ScopedLock<Mutex> l(serializeLock); + Event event(0, Poller::INVALID); + while (busy) { + waiters++; + bool timedout = !serializer.wait(serializeLock, targetTimeout); + waiters--; + + if (busy && timedout) { + return Event(0, Poller::TIMEOUT); + } + } + busy = true; + { + ScopedUnlock<Mutex> ul(serializeLock); + event = getEvent(targetTimeout); + } + busy = false; + + if (waiters > 0) + serializer.notify(); + return event; + } + + Event getEvent(AbsTime targetTimeout) { + bool timeoutPending = false; + + ScopedLock<Mutex> l(streamLock); // hold lock except for poll() + + // loop until poll event, async interrupt, or timeout + while (true) { + + // first check for any interrupts + while (interruptedHandles.size() > 0) { + PollerHandlePrivate& eh = *interruptedHandles.front(); + interruptedHandles.pop(); + { + ScopedLock<Mutex> lk(eh.lock); + if (!eh.isDeleted()) { + if (!eh.isIdle()) { + eh.setInactive(); + } + + // nullify the corresponding pollfd event, if any + int ehfd = eh.fd(); + std::vector<struct ::pollfd>::iterator i = pollfds.begin() + 1; // skip self pipe at front + for (; i != pollfds.end(); i++) { + if (i->fd == ehfd) { + i->events = 0; + if (i->revents) { + i->revents = 0; + pollCount--; + } + break; + } + } + return Event(eh.pollerHandle, Poller::INTERRUPTED); + } + } + PollerHandleDeletionManager.markForDeletion(&eh); + } + + // Check for shutdown + if (pollerPrivate.isShutdown) { + PollerHandleDeletionManager.markAllUnusedInThisThread(); + return Event(0, Poller::SHUTDOWN); + } + + // search for any remaining events from earlier poll() + int nfds = pollfds.size(); + while ((pollCount > 0) && (currentPollfd < nfds)) { + int index = currentPollfd++; + short evt = pollfds[index].revents; + if (evt != 0) { + pollCount--; + PollerHandlePrivate& eh = *pollHandles[index]; + ScopedLock<Mutex> l(eh.lock); + // stop polling this handle until resetMode() + pollfds[index].events = 0; + + // the handle could have gone inactive since snapshot taken + if (eh.isActive()) { + PollerHandle* handle = eh.pollerHandle; + assert(handle); + + // If the connection has been hungup we could still be readable + // (just not writable), allow us to readable until we get here again + if (evt & POLLHUP) { + if (eh.isHungup()) { + eh.setInactive(); + // Don't set up last Handle so that we don't reset this handle + // on re-entering Poller::wait. This means that we will never + // be set active again once we've returned disconnected, and so + // can never be returned again. + return Event(handle, Poller::DISCONNECTED); + } + eh.setHungup(); + } else { + eh.setInactive(); + } + return Event(handle, PollerPrivate::epollToDirection(evt)); + } + } + } + + if (timeoutPending) { + return Event(0, Poller::TIMEOUT); + } + + // no outstanding events, poll() for more + { + ScopedUnlock<Mutex> ul(streamLock); + + bool refreshed = pollerPrivate.registeredHandles.snapshot(pollHandles, pollfds); + if (refreshed) { + // we just drained all interruptedHandles and got a fresh snapshot + PollerHandleDeletionManager.markAllUnusedInThisThread(); + } + + if (!signalPipe.isSet()) { + int timeoutMs = -1; + if (!(targetTimeout == FAR_FUTURE)) { + timeoutMs = Duration(now(), targetTimeout) / TIME_MSEC; + if (timeoutMs < 0) + timeoutMs = 0; + } + + pollCount = ::poll(&pollfds[0], pollfds.size(), timeoutMs); + + if (pollCount ==-1 && errno != EINTR) { + QPID_POSIX_CHECK(pollCount); + } + else if (pollCount == 0) { + // timeout, unless shutdown or interrupt arrives in another thread + timeoutPending = true; + } + else { + if (pollfds[0].revents) { + pollCount--; // signal pipe doesn't count + } + } + } + else + pollCount = 0; + signalPipe.reset(); + } + currentPollfd = 1; + } + } + }; + + bool isShutdown; + HandleSet registeredHandles; + AtomicCount threadCount; + SignalPipe signalPipe; + EventStream eventStream; + + static short directionToEpollEvent(Poller::Direction dir) { + switch (dir) { + case Poller::INPUT: return POLLIN; + case Poller::OUTPUT: return POLLOUT; + case Poller::INOUT: return POLLIN | POLLOUT; + default: return 0; + } + } + + static Poller::EventType epollToDirection(short events) { + // POLLOUT & POLLHUP are mutually exclusive really, but at least socketpairs + // can give you both! + events = (events & POLLHUP) ? events & ~POLLOUT : events; + short e = events & (POLLIN | POLLOUT); + switch (e) { + case POLLIN: return Poller::READABLE; + case POLLOUT: return Poller::WRITABLE; + case POLLIN | POLLOUT: return Poller::READ_WRITABLE; + default: + return (events & (POLLHUP | POLLERR)) ? + Poller::DISCONNECTED : Poller::INVALID; + } + } + + PollerPrivate() : + isShutdown(false), eventStream(this) { + } + + ~PollerPrivate() {} + + void resetMode(PollerHandlePrivate& handle); + + void interrupt() { + signalPipe.set(); + } + + void interruptAll() { + // be async signal safe + signalPipe.setPermanently(); + } +}; + + +void Poller::registerHandle(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(eh.isIdle()); + + eh.setActive(); + impl->registeredHandles.add(handle.impl); + // not stale until monitored +} + +void Poller::unregisterHandle(PollerHandle& handle) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + eh.setIdle(); + impl->registeredHandles.remove(handle.impl); + impl->registeredHandles.setStale(); + impl->interrupt(); +} + +void PollerPrivate::resetMode(PollerHandlePrivate& eh) { + PollerHandle* ph; + { + // Called after an event has been processed for a handle + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isActive()); + + if (eh.isIdle() || eh.isDeleted()) { + return; + } + + if (eh.events==0) { + eh.setActive(); + return; + } + + if (!eh.isInterrupted()) { + // Handle still in use, allow events to resume. + eh.setActive(); + registeredHandles.setStale(); + // Ouch. This scales poorly for large handle sets. + // TODO: avoid new snapshot, perhaps create an index to pollfds or a + // pending reset queue to be processed before each poll(). However, the real + // scalable solution is to implement the OS-specific epoll equivalent. + interrupt(); + return; + } + ph = eh.pollerHandle; + } + + eventStream.addInterrupt(*ph); + interrupt(); +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + short oldEvents = eh.events; + eh.events |= PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + + // tell polling thread to update its pollfds + impl->registeredHandles.setStale(); + impl->interrupt(); +} + +void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + assert(!eh.isIdle()); + + short oldEvents = eh.events; + eh.events &= ~PollerPrivate::directionToEpollEvent(dir); + + // If no change nothing more to do - avoid unnecessary system call + if (oldEvents==eh.events) { + return; + } + + // If we're not actually listening wait till we are to perform change + if (!eh.isActive()) { + return; + } + + impl->registeredHandles.setStale(); + impl->interrupt(); +} + +void Poller::shutdown() { + // NB: this function must be async-signal safe, it must not + // call any function that is not async-signal safe. + + // Allow sloppy code to shut us down more than once + if (impl->isShutdown) + return; + + // Don't use any locking here - isShutdown will be visible to all + // after the write() anyway (it's a memory barrier) + impl->isShutdown = true; + + impl->interruptAll(); +} + +bool Poller::interrupt(PollerHandle& handle) { + { + PollerHandlePrivate& eh = *handle.impl; + ScopedLock<Mutex> l(eh.lock); + if (eh.isIdle() || eh.isDeleted()) { + return false; + } + + if (eh.isInterrupted()) { + return true; + } + + if (eh.isInactive()) { + eh.setInterrupted(); + return true; + } + eh.setInterrupted(); + eh.events = 0; + } + + impl->registeredHandles.setStale(); + impl->eventStream.addInterrupt(handle); + impl->interrupt(); + return true; +} + +void Poller::run() { + // Ensure that we exit thread responsibly under all circumstances + try { + // Make sure we can't be interrupted by signals at a bad time + ::sigset_t ss; + ::sigfillset(&ss); + ::pthread_sigmask(SIG_SETMASK, &ss, 0); + + ++(impl->threadCount); + do { + Event event = wait(); + + // If can read/write then dispatch appropriate callbacks + if (event.handle) { + event.process(); + } else { + // Handle shutdown + switch (event.type) { + case SHUTDOWN: + //last thread to respond to shutdown cleans up: + if (--(impl->threadCount) == 0) impl->registeredHandles.cleanup(); + PollerHandleDeletionManager.destroyThreadState(); + return; + default: + // This should be impossible + assert(false); + } + } + } while (true); + } catch (const std::exception& e) { + QPID_LOG(error, "IO worker thread exiting with unhandled exception: " << e.what()); + } + PollerHandleDeletionManager.destroyThreadState(); + --(impl->threadCount); +} + +bool Poller::hasShutdown() +{ + return impl->isShutdown; +} + +Poller::Event Poller::wait(Duration timeout) { + static __thread PollerHandlePrivate* lastReturnedHandle = 0; + + if (lastReturnedHandle) { + impl->resetMode(*lastReturnedHandle); + lastReturnedHandle = 0; + } + + Event event = impl->eventStream.next(timeout); + + switch (event.type) { + case INTERRUPTED: + case READABLE: + case WRITABLE: + case READ_WRITABLE: + lastReturnedHandle = event.handle->impl; + break; + default: + ; + } + + return event; +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + + +bool HandleSet::snapshot(std::vector<PollerHandlePrivate *>& hs , std::vector<struct ::pollfd>& fds) +{ + // Element 0 of the vectors is always the signal pipe, leave undisturbed + { + ScopedLock<Mutex> l(lock); + if (!stale) + return false; // no refresh done + + hs.resize(1); + for (std::set<PollerHandlePrivate*>::const_iterator i = handles.begin(); i != handles.end(); ++i) { + hs.push_back(*i); + } + stale = false; + // have copy of handle set (in vector form), drop the lock and build the pollfds + } + + // sync pollfds to same sizing as the handles + int sz = hs.size(); + fds.resize(sz); + + for (int j = 1; j < sz; ++j) { + // create a pollfd entry for each handle + struct ::pollfd& pollfd = fds[j]; + PollerHandlePrivate& eh = *hs[j]; + ScopedLock<Mutex> lk(eh.lock); + + if (!eh.isInactive() && !eh.isDeleted()) { + pollfd.fd = eh.fd(); + pollfd.events = eh.events; + } else { + pollfd.fd = -1; // tell poll() to ignore this fd + pollfd.events = 0; + } + } + return true; +} + + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h new file mode 100644 index 0000000000..34a2022694 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -0,0 +1,65 @@ +#ifndef _sys_posix_PrivatePosix_h +#define _sys_posix_PrivatePosix_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" + +struct timespec; +struct timeval; +struct addrinfo; + +namespace qpid { +namespace sys { + +// Private Time related implementation details +struct timespec& toTimespec(struct timespec& ts, const AbsTime& t); +struct timeval& toTimeval(struct timeval& tv, const Duration& t); +Duration toTime(const struct timespec& ts); + +// Private SocketAddress details +class SocketAddress; +const struct addrinfo& getAddrInfo(const SocketAddress&); + +// Posix fd as an IOHandle +class IOHandle { +public: + IOHandle(int fd0 = -1) : + fd(fd0) + {} + + int fd; +}; + +// Dummy IOHandle for places it's required in the API +// but we promise not to actually try to do any operations on the IOHandle +class NullIOHandle : public IOHandle { +public: + NullIOHandle() + {} +}; + +extern NullIOHandle DummyIOHandle; + +}} + +#endif /*!_sys_posix_PrivatePosix_h*/ diff --git a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp new file mode 100644 index 0000000000..3fb685d5b8 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Shlib.h" +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include <dlfcn.h> + + +namespace qpid { +namespace sys { + +void Shlib::load(const char* name) { + ::dlerror(); + handle = ::dlopen(name, RTLD_NOW); + const char* error = ::dlerror(); + if (error) { + throw Exception(QPID_MSG(error << ": " << name)); + } +} + +void Shlib::unload() { + if (handle) { + ::dlerror(); + ::dlclose(handle); + const char* error = ::dlerror(); + if (error) { + throw Exception(QPID_MSG(error)); + } + handle = 0; + } +} + +void* Shlib::getSymbol(const char* name) { + ::dlerror(); + void* sym = ::dlsym(handle, name); + const char* error = ::dlerror(); + if (error) + throw Exception(QPID_MSG(error << ": " << name)); + return sym; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp new file mode 100644 index 0000000000..4c860a7ef7 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp @@ -0,0 +1,353 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include "qpid/log/Logger.h" + +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <string.h> +#include <arpa/inet.h> +#include <iosfwd> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0), + currentAddrInfo(0) +{ +} + +SocketAddress::SocketAddress(const SocketAddress& sa) : + host(sa.host), + port(sa.port), + addrInfo(0), + currentAddrInfo(0) +{ +} + +SocketAddress& SocketAddress::operator=(const SocketAddress& sa) +{ + SocketAddress temp(sa); + + std::swap(temp, *this); + return *this; +} + +SocketAddress::~SocketAddress() +{ + if (addrInfo) { + ::freeaddrinfo(addrInfo); + } +} + +std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen, bool dispNameOnly, bool hideDecoration) +{ + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo(addr, addrlen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + std::string s; + switch (addr->sa_family) { + case AF_INET: s += dispName; break; + case AF_INET6: + if (!hideDecoration) { + s += "["; s += dispName; s+= "]"; + } else { + s += dispName; + } + break; + case AF_UNIX: s += "UNIX:"; break; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } + if (!dispNameOnly) { + s += ":"; + s += servName; + } + return s; +} + +uint16_t SocketAddress::getPort(::sockaddr const * const addr) +{ + switch (addr->sa_family) { + case AF_INET: return ntohs(((const ::sockaddr_in*)(const void*)addr)->sin_port); + case AF_INET6: return ntohs(((const ::sockaddr_in6*)(const void*)addr)->sin6_port); + default:throw Exception(QPID_MSG("Unexpected socket type")); + } +} + +std::string SocketAddress::asString(bool numeric, bool dispNameOnly, bool hideDecoration) const +{ + if (!numeric) + return host + ":" + port; + // Canonicalise into numeric id + const ::addrinfo& ai = getAddrInfo(*this); + + return asString(ai.ai_addr, ai.ai_addrlen, dispNameOnly, hideDecoration); +} + +std::string SocketAddress::getHost() const +{ + return host; +} + +/** + * Return true if this SocketAddress is IPv4 or IPv6 + */ +bool SocketAddress::isIp() const +{ + const ::addrinfo& ai = getAddrInfo(*this); + return ai.ai_family == AF_INET || ai.ai_family == AF_INET6; +} + +/** + * this represents the low address of an ACL address range. + * Given rangeHi that represents the high address, + * return a string showing the numeric comparisons that the + * inRange checks will do for address pair. + */ +std::string SocketAddress::comparisonDetails(const SocketAddress& rangeHi) const +{ + std::ostringstream os; + SocketAddress thisSa(*this); + SocketAddress rangeHiSa(rangeHi); + (void) getAddrInfo(thisSa); + (void) getAddrInfo(rangeHiSa); + os << "(" << thisSa.asString(true, true, false) << + "," << rangeHiSa.asString(true, true, false) << ")"; + while (thisSa.nextAddress()) { + if (!rangeHiSa.nextAddress()) { + throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() + + rangeHi.asString()))); + } + os << ",(" << thisSa.asString(true, true, false) << + "," << rangeHiSa.asString(true, true, false) << ")"; + } + if (rangeHiSa.nextAddress()) { + throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() + + rangeHi.asString()))); + } + std::string result = os.str(); + return result; +} + +/** + * For ACL address matching make sure that the two addresses, *this + * which is the low address and hiPeer which is the high address, are + * both numeric ip addresses of the same family and that hi > *this. + * + * Note that if the addresses resolve to more than one struct addrinfo + * then this and the hiPeer must be equal. This avoids having to do + * difficult range checks where the this and hiPeer both resolve to + * multiple IPv4 or IPv6 addresses. + * + * This check is run at acl file load time and not at run tme. + */ +bool SocketAddress::isComparable(const SocketAddress& hiPeer) const { + try { + // May only compare if this socket is IPv4 or IPv6 + SocketAddress lo(*this); + const ::addrinfo& peerLoInfo = getAddrInfo(lo); + if (!(peerLoInfo.ai_family == AF_INET || peerLoInfo.ai_family == AF_INET6)) { + return false; + } + try { + // May only compare if peer socket is same family + SocketAddress hi(hiPeer); + const ::addrinfo& peerHiInfo = getAddrInfo(hi); + if (peerLoInfo.ai_family != peerHiInfo.ai_family) { + return false; + } + // Host names that resolve to lists are allowed if they are equal. + // For example: localhost, or fjord.lab.example.com + if ((*this).asString() == hiPeer.asString()) { + return true; + } + // May only compare if this and peer resolve to single address. + if (lo.nextAddress() || hi.nextAddress()) { + return false; + } + // Make sure that the lo/hi relationship is ok + int res; + if (!compareAddresses(peerLoInfo, peerHiInfo, res) || res < 0) { + return false; + } + return true; + } catch (Exception) { + // failed to resolve hi + return false; + } + } catch (Exception) { + // failed to resolve lo + return false; + } +} + +/** + * *this SocketAddress was created from the numeric IP address of a + * connecting host. + * The lo and hi addresses are the limit checks from the ACL file. + * Return true if this address is in range of any of the address pairs + * in the limit check range. + * + * This check is executed on every incoming connection. + */ +bool SocketAddress::inRange(const SocketAddress& lo, + const SocketAddress& hi) const +{ + (*this).firstAddress(); + lo.firstAddress(); + hi.firstAddress(); + const ::addrinfo& thisInfo = getAddrInfo(*this); + const ::addrinfo& loInfo = getAddrInfo(lo); + const ::addrinfo& hiInfo = getAddrInfo(hi); + if (inRange(thisInfo, loInfo, hiInfo)) { + return true; + } + while (lo.nextAddress()) { + if (!hi.nextAddress()) { + assert (false); + throw(Exception(QPID_MSG("Comparison iteration fails: " + + lo.asString() + hi.asString()))); + } + const ::addrinfo& loInfo = getAddrInfo(lo); + const ::addrinfo& hiInfo = getAddrInfo(hi); + if (inRange(thisInfo, loInfo, hiInfo)) { + return true; + } + } + return false; +} + +/** + * *this SocketAddress was created from the numeric IP address of a + * connecting host. + * The lo and hi addresses are one binary address pair from a range + * given in an ACL file. + * Return true if this binary address is '>= lo' and '<= hi'. + */ +bool SocketAddress::inRange(const ::addrinfo& thisInfo, + const ::addrinfo& lo, + const ::addrinfo& hi) const +{ + int resLo; + int resHi; + if (!compareAddresses(lo, thisInfo, resLo)) { + return false; + } + if (!compareAddresses(hi, thisInfo, resHi)) { + return false; + } + if (resLo < 0) { + return false; + } + if (resHi > 0) { + return false; + } + return true; +} + +/** + * Compare this address against two binary low/high addresses. + * return true with result holding the comparison. + */ +bool SocketAddress::compareAddresses(const struct addrinfo& lo, + const struct addrinfo& hi, + int& result) const +{ + if (lo.ai_family != hi.ai_family) { + return false; + } + if (lo.ai_family == AF_INET) { + void* taddr; + + taddr = (void*)lo.ai_addr; + struct sockaddr_in* sin4lo = (struct sockaddr_in*)taddr; + taddr = (void*)hi.ai_addr; + struct sockaddr_in* sin4hi = (struct sockaddr_in*)taddr; + result = memcmp(&sin4hi->sin_addr, &sin4lo->sin_addr, sizeof(in_addr)); + } else if (lo.ai_family == AF_INET6) { + void* taddr; + + taddr = (void*)lo.ai_addr; + struct sockaddr_in6* sin6lo = (struct sockaddr_in6*)taddr; + taddr = (void*)hi.ai_addr; + struct sockaddr_in6* sin6hi = (struct sockaddr_in6*)taddr; + result = memcmp(&sin6hi->sin6_addr, &sin6lo->sin6_addr, sizeof(in6_addr)); + } else { + assert (false); + return false; + } + return true; +} + +void SocketAddress::firstAddress() const { + if (addrInfo) { + currentAddrInfo = addrInfo; + } else { + (void) getAddrInfo(*this); + } +} + +bool SocketAddress::nextAddress() const { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + if (!sa.addrInfo) { + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (sa.host.empty()) { + hints.ai_flags = AI_PASSIVE; + } else { + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + node = sa.host.c_str(); + } + const char* service = sa.port.empty() ? "0" : sa.port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; + } + + return *sa.currentAddrInfo; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/StrError.cpp b/qpid/cpp/src/qpid/sys/posix/StrError.cpp new file mode 100644 index 0000000000..633e20213c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/StrError.cpp @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/StrError.h" + +#include <string.h> + +namespace qpid { +namespace sys { + +std::string strError(int err) { + char buf[512] = "Unknown error"; +#ifdef _GNU_SOURCE + // GNU strerror_r returns the message + return ::strerror_r(err, buf, sizeof(buf)); +#else + // POSIX strerror_r doesn't return the buffer + ::strerror_r(err, buf, sizeof(buf)); + return std::string(buf); +#endif +} + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp new file mode 100755 index 0000000000..2a42a5b2a7 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/SystemInfo.cpp @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/posix/check.h" +#include <arpa/inet.h> +#include <sys/ioctl.h> +#include <sys/utsname.h> +#include <sys/types.h> // For FreeBSD +#include <sys/socket.h> // For FreeBSD +#include <netinet/in.h> // For FreeBSD +#include <ifaddrs.h> +#include <unistd.h> +#include <iostream> +#include <fstream> +#include <sstream> +#include <map> +#include <netdb.h> +#include <string.h> + +#ifndef HOST_NAME_MAX +# define HOST_NAME_MAX 256 +#endif + +using namespace std; + +namespace qpid { +namespace sys { + +long SystemInfo::concurrency() { +#ifdef _SC_NPROCESSORS_ONLN // Linux specific. + return sysconf(_SC_NPROCESSORS_ONLN); +#else + return -1; +#endif +} + +bool SystemInfo::getLocalHostname (Address &address) { + char name[HOST_NAME_MAX]; + if (::gethostname(name, sizeof(name)) != 0) + return false; + address.host = name; + return true; +} + +static const string LOOPBACK("127.0.0.1"); +static const string TCP("tcp"); + +// Test IPv4 address for loopback +inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) { + return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000); +} + +inline bool isLoopback(const ::sockaddr* addr) { + switch (addr->sa_family) { + case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr); + case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr); + default: return false; + } +} + +namespace { + inline socklen_t sa_len(::sockaddr* sa) + { + switch (sa->sa_family) { + case AF_INET: + return sizeof(struct sockaddr_in); + case AF_INET6: + return sizeof(struct sockaddr_in6); + default: + return sizeof(struct sockaddr_storage); + } + } + + inline bool isInetOrInet6(::sockaddr* sa) { + switch (sa->sa_family) { + case AF_INET: + case AF_INET6: + return true; + default: + return false; + } + } + typedef std::map<std::string, std::vector<std::string> > InterfaceInfo; + std::map<std::string, std::vector<std::string> > cachedInterfaces; + + void cacheInterfaceInfo() { + // Get interface info + ::ifaddrs* interfaceInfo; + QPID_POSIX_CHECK( ::getifaddrs(&interfaceInfo) ); + + char name[NI_MAXHOST]; + for (::ifaddrs* info = interfaceInfo; info != 0; info = info->ifa_next) { + + // Only use IPv4/IPv6 interfaces + if (!info->ifa_addr || !isInetOrInet6(info->ifa_addr)) continue; + + int rc=::getnameinfo(info->ifa_addr, sa_len(info->ifa_addr), + name, sizeof(name), 0, 0, + NI_NUMERICHOST); + if (rc >= 0) { + std::string address(name); + cachedInterfaces[info->ifa_name].push_back(address); + } else { + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + } + } + ::freeifaddrs(interfaceInfo); + } +} + +bool SystemInfo::getInterfaceAddresses(const std::string& interface, std::vector<std::string>& addresses) { + if ( cachedInterfaces.empty() ) cacheInterfaceInfo(); + InterfaceInfo::iterator i = cachedInterfaces.find(interface); + if ( i==cachedInterfaces.end() ) return false; + std::copy(i->second.begin(), i->second.end(), std::back_inserter(addresses)); + return true; +} + +void SystemInfo::getInterfaceNames(std::vector<std::string>& names ) { + if ( cachedInterfaces.empty() ) cacheInterfaceInfo(); + + for (InterfaceInfo::const_iterator i = cachedInterfaces.begin(); i!=cachedInterfaces.end(); ++i) { + names.push_back(i->first); + } +} + +void SystemInfo::getSystemId (std::string &osName, + std::string &nodeName, + std::string &release, + std::string &version, + std::string &machine) +{ + struct utsname _uname; + if (uname (&_uname) == 0) + { + osName = _uname.sysname; + nodeName = _uname.nodename; + release = _uname.release; + version = _uname.version; + machine = _uname.machine; + } +} + +uint32_t SystemInfo::getProcessId() +{ + return (uint32_t) ::getpid(); +} + +uint32_t SystemInfo::getParentProcessId() +{ + return (uint32_t) ::getppid(); +} + +// Linux specific (Solaris has quite different stuff in /proc) +string SystemInfo::getProcessName() +{ + string value; + + ifstream input("/proc/self/status"); + if (input.good()) { + while (!input.eof()) { + string key; + input >> key; + if (key == "Name:") { + input >> value; + break; + } + } + input.close(); + } + + return value; +} + +// Always true. Only Windows has exception cases. +bool SystemInfo::threadSafeShutdown() +{ + return true; +} + + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/posix/Thread.cpp b/qpid/cpp/src/qpid/sys/posix/Thread.cpp new file mode 100644 index 0000000000..349e35d643 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Thread.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Thread.h" + +#include "qpid/sys/Runnable.h" +#include "qpid/sys/posix/check.h" + +#include <pthread.h> + +namespace qpid { +namespace sys { + +namespace { +void* runRunnable(void* p) +{ + static_cast<Runnable*>(p)->run(); + return 0; +} +} + +class ThreadPrivate { +public: + pthread_t thread; + + ThreadPrivate(Runnable* runnable) { + QPID_POSIX_ASSERT_THROW_IF(::pthread_create(&thread, NULL, runRunnable, runnable)); + } + + ThreadPrivate() : thread(::pthread_self()) {} +}; + +Thread::Thread() {} + +Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {} + +Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {} + +Thread::operator bool() { + return !!impl; +} + +bool Thread::operator==(const Thread& t) const { + return pthread_equal(impl->thread, t.impl->thread) != 0; +} + +bool Thread::operator!=(const Thread& t) const { + return !(*this==t); +} + +void Thread::join(){ + if (impl) { + QPID_POSIX_ASSERT_THROW_IF(::pthread_join(impl->thread, 0)); + } +} + +unsigned long Thread::logId() { + // This does need to be the C cast operator as + // pthread_t could be either a pointer or an integer + // and so we can't know static_cast<> or reinterpret_cast<> + return (unsigned long) ::pthread_self(); +} + +Thread Thread::current() { + Thread t; + t.impl.reset(new ThreadPrivate()); + return t; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp new file mode 100644 index 0000000000..10a5d944b1 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/posix/PrivatePosix.h" + +#include "qpid/sys/Time.h" +#include <ostream> +#include <istream> +#include <sstream> +#include <time.h> +#include <stdio.h> +#include <sys/time.h> +#include <unistd.h> +#include <iomanip> +#include <cctype> + +namespace { +int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); } +} + +namespace qpid { +namespace sys { + +AbsTime::AbsTime(const AbsTime& t, const Duration& d) : + timepoint(d == Duration::max() ? max_abstime() : t.timepoint+d.nanosecs) +{} + +AbsTime AbsTime::Zero() { + AbsTime epoch; epoch.timepoint = 0; + return epoch; +} + +AbsTime AbsTime::FarFuture() { + AbsTime ff; ff.timepoint = max_abstime(); return ff; +} + +AbsTime AbsTime::now() { + struct timespec ts; + ::clock_gettime(CLOCK_MONOTONIC, &ts); + AbsTime time_now; + time_now.timepoint = toTime(ts).nanosecs; + return time_now; +} + +AbsTime AbsTime::epoch() { + return AbsTime(now(), -Duration::FromEpoch()); +} + +Duration Duration::FromEpoch() { + struct timespec ts; + ::clock_gettime(CLOCK_REALTIME, &ts); + return toTime(ts).nanosecs; +} + +Duration::Duration(const AbsTime& start, const AbsTime& finish) : + nanosecs(finish.timepoint - start.timepoint) +{} + +namespace { +/** type conversion helper: an infinite timeout for time_t sized types **/ +const time_t TIME_T_MAX = std::numeric_limits<time_t>::max(); +} + +struct timespec& toTimespec(struct timespec& ts, const AbsTime& a) { + Duration t(ZERO, a); + Duration secs = t / TIME_SEC; + ts.tv_sec = (secs > TIME_T_MAX) ? TIME_T_MAX : static_cast<time_t>(secs); + ts.tv_nsec = static_cast<long>(t % TIME_SEC); + return ts; +} + +Duration toTime(const struct timespec& ts) { + return ts.tv_sec*TIME_SEC + ts.tv_nsec; +} + +std::ostream& operator<<(std::ostream& o, const Duration& d) { + if (d >= TIME_SEC) return o << (double(d)/TIME_SEC) << "s"; + if (d >= TIME_MSEC) return o << (double(d)/TIME_MSEC) << "ms"; + if (d >= TIME_USEC) return o << (double(d)/TIME_USEC) << "us"; + return o << int64_t(d) << "ns"; +} + +std::istream& operator>>(std::istream& i, Duration& d) { + // Don't throw, let the istream throw if it's configured to do so. + double number; + i >> number; + if (i.fail()) return i; + + if (i.eof() || std::isspace(i.peek())) // No suffix + d = int64_t(number*TIME_SEC); + else { + std::stringbuf suffix; + i >> &suffix; + if (i.fail()) return i; + std::string suffix_str = suffix.str(); + if (suffix_str.compare("s") == 0) d = int64_t(number*TIME_SEC); + else if (suffix_str.compare("ms") == 0) d = int64_t(number*TIME_MSEC); + else if (suffix_str.compare("us") == 0) d = int64_t(number*TIME_USEC); + else if (suffix_str.compare("ns") == 0) d = int64_t(number*TIME_NSEC); + else i.setstate(std::ios::failbit); + } + return i; +} + +namespace { +inline std::ostream& outputFormattedTime(std::ostream& o, const ::time_t* time) { + ::tm timeinfo; + char time_string[100]; + ::strftime(time_string, 100, + "%Y-%m-%d %H:%M:%S", + localtime_r(time, &timeinfo)); + return o << time_string; +} +} + +std::ostream& operator<<(std::ostream& o, const AbsTime& t) { + ::time_t rawtime(t.timepoint/TIME_SEC); + return outputFormattedTime(o, &rawtime); +} + +void outputFormattedNow(std::ostream& o) { + ::time_t rawtime; + ::time(&rawtime); + outputFormattedTime(o, &rawtime); + o << " "; +} + +void outputHiresNow(std::ostream& o) { + ::timespec time; + ::clock_gettime(CLOCK_REALTIME, &time); + ::time_t seconds = time.tv_sec; + outputFormattedTime(o, &seconds); + o << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << " "; +} + +void sleep(int secs) { + ::sleep(secs); +} + +void usleep(uint64_t usecs) { + ::usleep(usecs); +} + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/Time.h b/qpid/cpp/src/qpid/sys/posix/Time.h new file mode 100755 index 0000000000..62d734c816 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Time.h @@ -0,0 +1,34 @@ +#ifndef QPID_SYS_POSIX_TIME_H +#define QPID_SYS_POSIX_TIME_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/IntegerTypes.h" + +namespace qpid { +namespace sys { + +/** + * Class to represent an instant in time. + */ +typedef int64_t TimePrivate; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_POSIX_TIME_H*/ diff --git a/qpid/cpp/src/qpid/sys/posix/check.h b/qpid/cpp/src/qpid/sys/posix/check.h new file mode 100644 index 0000000000..1bfe5d6d78 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/check.h @@ -0,0 +1,53 @@ +#ifndef _posix_check_h +#define _posix_check_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Exception.h" +#include "qpid/Msg.h" + +#include <cerrno> +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> + +#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRNO))) + +/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +#define QPID_POSIX_CHECK(RESULT) \ + if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) + +/** Throw a posix error if ERRNO is non-zero */ +#define QPID_POSIX_THROW_IF(ERRNO) \ + do { int e=(ERRNO); if (e) throw QPID_POSIX_ERROR(e); } while(0) + +/** Same as _THROW_IF in a release build, but abort a debug build */ +#ifdef NDEBUG +#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) QPID_POSIX_THROW_IF(ERRNO) +#else +#define QPID_POSIX_ASSERT_THROW_IF(ERRNO) \ + do { int e=(ERRNO); if (e) { errno=e; ::perror(0); assert(0); } } while(0) +#endif + +#define QPID_POSIX_ABORT_IF(ERRNO) if ((int) ERRNO) { errno=ERRNO; ::perror(0); abort(); } + +#endif /*!_posix_check_h*/ |