diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows')
21 files changed, 3520 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp new file mode 100644 index 0000000000..8d84fdb7b2 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -0,0 +1,755 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/mingw32_compat.h" + +#include <boost/thread/once.hpp> + +#include <queue> +#include <winsock2.h> +#include <mswsock.h> +#include <windows.h> + +#include <boost/bind.hpp> + +namespace { + + typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock; + +/* + * The function pointers for AcceptEx and ConnectEx need to be looked up + * at run time. Make sure this is done only once. + */ +boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT; +LPFN_ACCEPTEX fnAcceptEx = 0; +typedef void (*lookUpFunc)(const qpid::sys::Socket &); + +void lookUpAcceptEx() { + SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + GUID guidAcceptEx = WSAID_ACCEPTEX; + DWORD dwBytes = 0; + WSAIoctl(h, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &guidAcceptEx, + sizeof(guidAcceptEx), + &fnAcceptEx, + sizeof(fnAcceptEx), + &dwBytes, + NULL, + NULL); + closesocket(h); + if (fnAcceptEx == 0) + throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); +} + +} + +namespace qpid { +namespace sys { +namespace windows { + +/* + * Asynch Acceptor + * + */ +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { + + friend class AsynchAcceptResult; + +public: + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); + void start(Poller::shared_ptr poller); + +private: + void restart(void); + + AsynchAcceptor::Callback acceptedCallback; + const Socket& socket; +}; + +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) + : acceptedCallback(callback), + socket(s) { + + s.setNonblocking(); +#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */ + boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx); +#else + boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce); +#endif +} + +AsynchAcceptor::~AsynchAcceptor() +{ + socket.close(); +} + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + PollerHandle ph = PollerHandle(socket); + poller->monitorHandle(ph, Poller::INPUT); + restart (); +} + +void AsynchAcceptor::restart(void) { + DWORD bytesReceived = 0; // Not used, needed for AcceptEx API + AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, + this, + toSocketHandle(socket)); + BOOL status; + status = ::fnAcceptEx(toSocketHandle(socket), + toSocketHandle(*result->newSocket), + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); + QPID_WINDOWS_CHECK_ASYNC_START(status); +} + + +AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, + SOCKET listener) + : callback(cb), acceptor(acceptor), listener(listener) { + newSocket.reset (new Socket()); +} + +void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { + ::setsockopt (toSocketHandle(*newSocket), + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&listener, + sizeof (listener)); + callback(*(newSocket.release())); + acceptor->restart (); + delete this; +} + +void AsynchAcceptResult::failure(int /*status*/) { + //if (status != WSA_OPERATION_ABORTED) + // Can there be anything else? ; + delete this; +} + +/* + * AsynchConnector does synchronous connects for now... to do asynch the + * IocpPoller will need some extension to register an event handle as a + * CONNECT-type "direction", the connect completion/result will need an + * event handle to associate with the connecting handle. But there's no + * time for that right now... + */ +class AsynchConnector : public qpid::sys::AsynchConnector { +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + const std::string hostname; + const std::string port; + +public: + AsynchConnector(const Socket& socket, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb = 0); + void start(Poller::shared_ptr poller); +}; + +AsynchConnector::AsynchConnector(const Socket& sock, + const std::string& hname, + const std::string& p, + ConnectedCallback connCb, + FailedCallback failCb) : + connCallback(connCb), failCallback(failCb), socket(sock), + hostname(hname), port(p) +{ +} + +void AsynchConnector::start(Poller::shared_ptr) +{ + try { + socket.connect(hostname, port); + socket.setNonblocking(); + connCallback(socket); + } catch(std::exception& e) { + if (failCallback) + failCallback(socket, -1, std::string(e.what())); + socket.close(); + } +} + +} // namespace windows + +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new windows::AsynchAcceptor(s, callback); +} + +AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new windows::AsynchConnector(s, + hostname, + port, + connCb, + failCb); +} + + +/* + * Asynch reader/writer + */ + +namespace windows { + +class AsynchIO : public qpid::sys::AsynchIO { +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + ~AsynchIO(); + + // Methods inherited from qpid::sys::AsynchIO + + /** + * Notify the object is should delete itself as soon as possible. + */ + virtual void queueForDeletion(); + + /// Take any actions needed to prepare for working with the poller. + virtual void start(Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void startReading(); + virtual void stopReading(); + virtual void requestCallback(RequestCallback); + + /** + * getQueuedBuffer returns a buffer from the buffer queue, if one is + * available. + * + * @retval Pointer to BufferBase buffer; 0 if none is available. + */ + virtual BufferBase* getQueuedBuffer(); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + Poller::shared_ptr poller; + + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + /* The MSVC-supplied deque is not thread-safe; keep locks to serialize + * access to the buffer queue and write queue. + */ + Mutex bufferQueueLock; + + // Number of outstanding I/O operations. + volatile LONG opsInProgress; + // Is there a write in progress? + volatile bool writeInProgress; + // Deletion requested, but there are callbacks in progress. + volatile bool queuedDelete; + // Socket close requested, but there are operations in progress. + volatile bool queuedClose; + +private: + // Dispatch events that have completed. + void notifyEof(void); + void notifyDisconnect(void); + void notifyClosed(void); + void notifyBuffersEmpty(void); + void notifyIdle(void); + + /** + * Initiate a write of the specified buffer. There's no callback for + * write completion to the AsynchIO object. + */ + void startWrite(AsynchIO::BufferBase* buff); + + void close(void); + + /** + * readComplete is called when a read request is complete. + * + * @param result Results of the operation. + */ + void readComplete(AsynchReadResult *result); + + /** + * writeComplete is called when a write request is complete. + * + * @param result Results of the operation. + */ + void writeComplete(AsynchWriteResult *result); + + /** + * Queue of completions to run. This queue enforces the requirement + * from upper layers that only one thread at a time is allowed to act + * on any given connection. Once a thread is busy processing a completion + * on this object, other threads that dispatch completions queue the + * completions here for the in-progress thread to handle when done. + * Thus, any threads can dispatch a completion from the IocpPoller, but + * this class ensures that actual processing at the connection level is + * only on one thread at a time. + */ + std::queue<AsynchIoResult *> completionQueue; + volatile bool working; + Mutex completionLock; + + /** + * Called when there's a completion to process. + */ + void completion(AsynchIoResult *result); +}; + +// This is used to encapsulate pure callbacks into a handle +class CallbackHandle : public IOHandle { +public: + CallbackHandle(AsynchIoResult::Completer completeCb, + AsynchIO::RequestCallback reqCb = 0) : + IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb)) + {} +}; + +AsynchIO::AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb) : + + readCallback(rCb), + eofCallback(eofCb), + disCallback(disCb), + closedCallback(cCb), + emptyCallback(eCb), + idleCallback(iCb), + socket(s), + opsInProgress(0), + writeInProgress(false), + queuedDelete(false), + queuedClose(false), + working(false) { +} + +struct deleter +{ + template <typename T> + void operator()(T *ptr){ delete ptr;} +}; + +AsynchIO::~AsynchIO() { + std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); + std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); +} + +void AsynchIO::queueForDeletion() { + queuedDelete = true; + if (opsInProgress > 0) { + QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); + // AsynchIOHandler calls this then deletes itself; don't do any more + // callbacks. + readCallback = 0; + eofCallback = 0; + disCallback = 0; + closedCallback = 0; + emptyCallback = 0; + idleCallback = 0; + } + else { + delete this; + } +} + +void AsynchIO::start(Poller::shared_ptr poller0) { + PollerHandle ph = PollerHandle(socket); + poller = poller0; + poller->monitorHandle(ph, Poller::INPUT); + if (writeQueue.size() > 0) // Already have data queued for write + notifyPendingWrite(); + startReading(); +} + +void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + QLock l(bufferQueueLock); + bufferQueue.push_back(buff); +} + +void AsynchIO::unread(AsynchIO::BufferBase* buff) { + assert(buff); + buff->squish(); + QLock l(bufferQueueLock); + bufferQueue.push_front(buff); +} + +void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) { + assert(buff); + QLock l(bufferQueueLock); + writeQueue.push_back(buff); + if (!writeInProgress) + notifyPendingWrite(); +} + +void AsynchIO::notifyPendingWrite() { + // This method is generally called from a processing thread; transfer + // work on this to an I/O thread. Much of the upper layer code assumes + // that all I/O-related things happen in an I/O thread. + if (poller == 0) // Not really going yet... + return; + + InterlockedIncrement(&opsInProgress); + PollerHandle ph(CallbackHandle(boost::bind(&AsynchIO::completion, this, _1))); + poller->monitorHandle(ph, Poller::OUTPUT); +} + +void AsynchIO::queueWriteClose() { + queuedClose = true; + if (!writeInProgress) + notifyPendingWrite(); +} + +bool AsynchIO::writeQueueEmpty() { + QLock l(bufferQueueLock); + return writeQueue.size() == 0; +} + +/* + * Initiate a read operation. AsynchIO::readComplete() will be + * called when the read is complete and data is available. + */ +void AsynchIO::startReading() { + if (queuedDelete) + return; + + // (Try to) get a buffer; look on the front since there may be an + // "unread" one there with data remaining from last time. + AsynchIO::BufferBase *buff = 0; + { + QLock l(bufferQueueLock); + + if (!bufferQueue.empty()) { + buff = bufferQueue.front(); + assert(buff); + bufferQueue.pop_front(); + } + } + if (buff != 0) { + int readCount = buff->byteCount - buff->dataCount; + AsynchReadResult *result = + new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1), + buff, + readCount); + DWORD bytesReceived = 0, flags = 0; + InterlockedIncrement(&opsInProgress); + int status = WSARecv(toSocketHandle(socket), + const_cast<LPWSABUF>(result->getWSABUF()), 1, + &bytesReceived, + &flags, + result->overlapped(), + 0); + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + result->failure(error); + result = 0; // result is invalid here + return; + } + } + // On status 0 or WSA_IO_PENDING, completion will handle the rest. + } + else { + notifyBuffersEmpty(); + } + return; +} + +// stopReading was added to prevent a race condition with read-credit on Linux. +// It may or may not be required on windows. +// +// AsynchIOHandler::readbuff() calls stopReading() inside the same +// critical section that protects startReading() in +// AsynchIOHandler::giveReadCredit(). +// +void AsynchIO::stopReading() {} + +// Queue the specified callback for invocation from an I/O thread. +void AsynchIO::requestCallback(RequestCallback callback) { + // This method is generally called from a processing thread; transfer + // work on this to an I/O thread. Much of the upper layer code assumes + // that all I/O-related things happen in an I/O thread. + if (poller == 0) // Not really going yet... + return; + + InterlockedIncrement(&opsInProgress); + PollerHandle ph(CallbackHandle( + boost::bind(&AsynchIO::completion, this, _1), + callback)); + poller->monitorHandle(ph, Poller::INPUT); +} + +/** + * Return a queued buffer if there are enough to spare. + */ +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { + QLock l(bufferQueueLock); + // Always keep at least one buffer (it might have data that was + // "unread" in it). + if (bufferQueue.size() <= 1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + bufferQueue.pop_back(); + return buff; +} + +void AsynchIO::notifyEof(void) { + if (eofCallback) + eofCallback(*this); +} + +void AsynchIO::notifyDisconnect(void) { + if (disCallback) + disCallback(*this); +} + +void AsynchIO::notifyClosed(void) { + if (closedCallback) + closedCallback(*this, socket); +} + +void AsynchIO::notifyBuffersEmpty(void) { + if (emptyCallback) + emptyCallback(*this); +} + +void AsynchIO::notifyIdle(void) { + if (idleCallback) + idleCallback(*this); +} + +/* + * Asynch reader/writer using overlapped I/O + */ + +void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { + writeInProgress = true; + InterlockedIncrement(&opsInProgress); + AsynchWriteResult *result = + new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), + buff, + buff->dataCount); + DWORD bytesSent = 0; + int status = WSASend(toSocketHandle(socket), + const_cast<LPWSABUF>(result->getWSABUF()), 1, + &bytesSent, + 0, + result->overlapped(), + 0); + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + result->failure(error); // Also decrements in-progress count + result = 0; // result is invalid here + return; + } + } + // On status 0 or WSA_IO_PENDING, completion will handle the rest. + return; +} + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(void) { + socket.close(); + notifyClosed(); +} + +void AsynchIO::readComplete(AsynchReadResult *result) { + int status = result->getStatus(); + size_t bytes = result->getTransferred(); + if (status == 0 && bytes > 0) { + bool restartRead = true; // May not if receiver doesn't want more + if (readCallback) + readCallback(*this, result->getBuff()); + if (restartRead) + startReading(); + } + else { + // No data read, so put the buffer back. It may be partially filled, + // so "unread" it back to the front of the queue. + unread(result->getBuff()); + notifyEof(); + if (status != 0) + { + notifyDisconnect(); + } + } +} + +/* + * NOTE - this completion is called for completed writes and also when + * a write is desired. The difference is in the buff - if a write is desired + * the buff is 0. + */ +void AsynchIO::writeComplete(AsynchWriteResult *result) { + int status = result->getStatus(); + size_t bytes = result->getTransferred(); + AsynchIO::BufferBase *buff = result->getBuff(); + if (buff != 0) { + writeInProgress = false; + if (status == 0 && bytes > 0) { + if (bytes < result->getRequested()) // Still more to go; resubmit + startWrite(buff); + else + queueReadBuffer(buff); // All done; back to the pool + } + else { + // An error... if it's a connection close, ignore it - it will be + // noticed and handled on a read completion any moment now. + // What to do with real error??? Save the Buffer? + } + } + + // If there are no writes outstanding, check for more writes to initiate + // (either queued or via idle). The opsInProgress count is handled in + // completion() + if (!writeInProgress) { + bool writing = false; + { + QLock l(bufferQueueLock); + if (writeQueue.size() > 0) { + buff = writeQueue.front(); + assert(buff); + writeQueue.pop_front(); + startWrite(buff); + writing = true; + } + } + if (!writing && !queuedClose) { + notifyIdle(); + } + } + return; +} + +void AsynchIO::completion(AsynchIoResult *result) { + { + ScopedLock<Mutex> l(completionLock); + if (working) { + completionQueue.push(result); + return; + } + + // First thread in with something to do; note we're working then keep + // handling completions. + working = true; + while (result != 0) { + // New scope to unlock temporarily. + { + ScopedUnlock<Mutex> ul(completionLock); + AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); + if (r != 0) + readComplete(r); + else { + AsynchWriteResult *w = + dynamic_cast<AsynchWriteResult*>(result); + if (w != 0) + writeComplete(w); + else { + AsynchCallbackRequest *req = + dynamic_cast<AsynchCallbackRequest*>(result); + req->reqCallback(*this); + } + } + delete result; + result = 0; + InterlockedDecrement(&opsInProgress); + } + // Lock is held again. + if (completionQueue.empty()) + continue; + result = completionQueue.front(); + completionQueue.pop(); + } + working = false; + } + // Lock released; ok to close if ops are done and close requested. + // Layer above will call back to queueForDeletion() if it hasn't + // already been done. If it already has, go ahead and delete. + if (opsInProgress == 0) { + if (queuedClose) + // close() may cause a delete; don't trust 'this' on return + close(); + else if (queuedDelete) + delete this; + } +} + +} // namespace windows + +AsynchIO* qpid::sys::AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h new file mode 100755 index 0000000000..b11324918b --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -0,0 +1,204 @@ +#ifndef _windows_asynchIoResult_h +#define _windows_asynchIoResult_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Socket.h" +#include <memory.h> +#include <winsock2.h> +#include <ws2tcpip.h> + +namespace qpid { +namespace sys { +namespace windows { + +/* + * AsynchIoResult defines the class that receives the result of an + * asynchronous I/O operation, either send/recv or accept/connect. + * + * Operation factories should set one of these up before beginning the + * operation. Poller knows how to dispatch completion to this class. + * This class must be subclassed for needed operations; this class provides + * an interface only and cannot be instantiated. + * + * This class is tied to Windows; it inherits from OVERLAPPED so that the + * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call + * the completion handler. + */ +class AsynchResult : private OVERLAPPED { +public: + LPOVERLAPPED overlapped(void) { return this; } + static AsynchResult* from_overlapped(LPOVERLAPPED ol) { + return static_cast<AsynchResult*>(ol); + } + virtual void success (size_t bytesTransferred) { + bytes = bytesTransferred; + status = 0; + complete(); + } + virtual void failure (int error) { + bytes = 0; + status = error; + complete(); + } + size_t getTransferred(void) const { return bytes; } + int getStatus(void) const { return status; } + +protected: + AsynchResult() : bytes(0), status(0) + { memset(overlapped(), 0, sizeof(OVERLAPPED)); } + ~AsynchResult() {} + virtual void complete(void) = 0; + + size_t bytes; + int status; +}; + +class AsynchAcceptor; + +class AsynchAcceptResult : public AsynchResult { + + friend class AsynchAcceptor; + +public: + AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, + SOCKET listener); + virtual void success (size_t bytesTransferred); + virtual void failure (int error); + +private: + virtual void complete(void) {} // No-op for this class. + + std::auto_ptr<qpid::sys::Socket> newSocket; + qpid::sys::AsynchAcceptor::Callback callback; + AsynchAcceptor *acceptor; + SOCKET listener; + + // AcceptEx needs a place to write the local and remote addresses + // when accepting the connection. Place those here; get enough for + // IPv6 addresses, even if the socket is IPv4. + enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16, + SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN }; + char addressBuffer[SOCKADDRBUFLEN]; +}; + +class AsynchIoResult : public AsynchResult { +public: + typedef boost::function1<void, AsynchIoResult *> Completer; + + virtual ~AsynchIoResult() {} + qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; } + size_t getRequested(void) const { return requested; } + const WSABUF *getWSABUF(void) const { return &wsabuf; } + +protected: + void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; } + +protected: + AsynchIoResult(Completer cb, + qpid::sys::AsynchIO::BufferBase *buff, size_t length) + : completionCallback(cb), iobuff(buff), requested(length) {} + + virtual void complete(void) = 0; + WSABUF wsabuf; + Completer completionCallback; + +private: + qpid::sys::AsynchIO::BufferBase *iobuff; + size_t requested; // Number of bytes in original I/O request +}; + +class AsynchReadResult : public AsynchIoResult { + + // complete() updates buffer then does completion callback. + virtual void complete(void) { + getBuff()->dataCount += bytes; + completionCallback(this); + } + +public: + AsynchReadResult(AsynchIoResult::Completer cb, + qpid::sys::AsynchIO::BufferBase *buff, + size_t length) + : AsynchIoResult(cb, buff, length) { + wsabuf.buf = buff->bytes + buff->dataCount; + wsabuf.len = length; + } +}; + +class AsynchWriteResult : public AsynchIoResult { + + // complete() updates buffer then does completion callback. + virtual void complete(void) { + qpid::sys::AsynchIO::BufferBase *b = getBuff(); + b->dataStart += bytes; + b->dataCount -= bytes; + completionCallback(this); + } + +public: + AsynchWriteResult(AsynchIoResult::Completer cb, + qpid::sys::AsynchIO::BufferBase *buff, + size_t length) + : AsynchIoResult(cb, buff, length) { + wsabuf.buf = buff ? buff->bytes : 0; + wsabuf.len = length; + } +}; + +class AsynchWriteWanted : public AsynchWriteResult { + + // complete() just does completion callback; no buffers used. + virtual void complete(void) { + completionCallback(this); + } + +public: + AsynchWriteWanted(AsynchIoResult::Completer cb) + : AsynchWriteResult(cb, 0, 0) { + wsabuf.buf = 0; + wsabuf.len = 0; + } +}; + +class AsynchCallbackRequest : public AsynchIoResult { + // complete() needs to simply call the completionCallback; no buffers. + virtual void complete(void) { + completionCallback(this); + } + +public: + AsynchCallbackRequest(AsynchIoResult::Completer cb, + qpid::sys::AsynchIO::RequestCallback reqCb) + : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) { + wsabuf.buf = 0; + wsabuf.len = 0; + } + + qpid::sys::AsynchIO::RequestCallback reqCallback; +}; + +}}} // qpid::sys::windows + +#endif /*!_windows_asynchIoResult_h*/ diff --git a/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp new file mode 100644 index 0000000000..88f1637d48 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/FileSysDir.h" +#include "qpid/sys/StrError.h" +#include "qpid/Exception.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <direct.h> +#include <errno.h> + +namespace qpid { +namespace sys { + +bool FileSysDir::exists (void) const +{ + const char *cpath = dirPath.c_str (); + struct _stat s; + if (::_stat(cpath, &s)) { + if (errno == ENOENT) { + return false; + } + throw qpid::Exception (strError(errno) + + ": Can't check directory: " + dirPath); + } + if (s.st_mode & _S_IFDIR) + return true; + throw qpid::Exception(dirPath + " is not a directory"); +} + +void FileSysDir::mkdir(void) +{ + if (::_mkdir(dirPath.c_str()) == -1) + throw Exception ("Can't create directory: " + dirPath); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp new file mode 100755 index 0000000000..250737cb99 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include <windows.h> + +namespace qpid { +namespace sys { + +SOCKET toFd(const IOHandlePrivate* h) +{ + return h->fd; +} + +IOHandle::IOHandle(IOHandlePrivate* h) : + impl(h) +{} + +IOHandle::~IOHandle() { + delete impl; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h new file mode 100755 index 0000000000..5943db5cc7 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -0,0 +1,61 @@ +#ifndef _sys_windows_IoHandlePrivate_h +#define _sys_windows_IoHandlePrivate_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/CommonImportExport.h" + +#include <winsock2.h> + +namespace qpid { +namespace sys { + +// Private fd related implementation details +// There should be either a valid socket handle or a completer callback. +// Handle is used to associate with poller's iocp; completer is used to +// inject a completion that will very quickly trigger a callback to the +// completer from an I/O thread. If the callback mechanism is used, there +// can be a RequestCallback set - this carries the callback object through +// from AsynchIO::requestCallback() through to the I/O completion processing. +class IOHandlePrivate { + friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); + static IOHandlePrivate* getImpl(const IOHandle& h); + +public: + IOHandlePrivate(SOCKET f = INVALID_SOCKET, + windows::AsynchIoResult::Completer cb = 0, + AsynchIO::RequestCallback reqCallback = 0) : + fd(f), event(cb), cbRequest(reqCallback) + {} + + SOCKET fd; + windows::AsynchIoResult::Completer event; + AsynchIO::RequestCallback cbRequest; +}; + +QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s); + +}} + +#endif /* _sys_windows_IoHandlePrivate_h */ diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp new file mode 100755 index 0000000000..1805dd2cd8 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -0,0 +1,219 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Dispatcher.h" + +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/windows/check.h" + +#include <winsock2.h> +#include <windows.h> + +#include <assert.h> +#include <vector> +#include <exception> + +namespace qpid { +namespace sys { + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + SOCKET fd; + windows::AsynchIoResult::Completer cb; + AsynchIO::RequestCallback cbRequest; + + PollerHandlePrivate(SOCKET f, + windows::AsynchIoResult::Completer cb0 = 0, + AsynchIO::RequestCallback rcb = 0) + : fd(f), cb(cb0), cbRequest(rcb) + { + } + +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(toSocketHandle(static_cast<const Socket&>(h)), h.impl->event, h.impl->cbRequest)) +{} + +PollerHandle::~PollerHandle() { + delete impl; +} + +/** + * Concrete implementation of Poller to use the Windows I/O Completion + * port (IOCP) facility. + */ +class PollerPrivate { + friend class Poller; + + const HANDLE iocp; + + // The number of threads running the event loop. + volatile LONG threadsRunning; + + // Shutdown request is handled by setting isShutdown and injecting a + // well-formed completion event into the iocp. + bool isShutdown; + + PollerPrivate() : + iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)), + threadsRunning(0), + isShutdown(false) { + QPID_WINDOWS_CHECK_NULL(iocp); + } + + ~PollerPrivate() { + // It's probably okay to ignore any errors here as there can't be + // data loss + ::CloseHandle(iocp); + } +}; + +void Poller::shutdown() { + // Allow sloppy code to shut us down more than once. + if (impl->isShutdown) + return; + ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O + PostQueuedCompletionStatus(impl->iocp, 0, key, 0); +} + +bool Poller::hasShutdown() +{ + return impl->isShutdown; +} + +bool Poller::interrupt(PollerHandle&) { + return false; // There's no concept of a registered handle. +} + +void Poller::run() { + do { + Poller::Event event = this->wait(); + + // Handle shutdown + switch (event.type) { + case Poller::SHUTDOWN: + return; + break; + case Poller::INVALID: // On any type of success or fail completion + break; + default: + // This should be impossible + assert(false); + } + } while (true); +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { + HANDLE h = (HANDLE)(handle.impl->fd); + if (h != INVALID_HANDLE_VALUE) { + HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0); + QPID_WINDOWS_CHECK_NULL(iocpHandle); + } + else { + // INPUT is used to request a callback; OUTPUT to request a write + assert(dir == Poller::INPUT || dir == Poller::OUTPUT); + + if (dir == Poller::OUTPUT) { + windows::AsynchWriteWanted *result = + new windows::AsynchWriteWanted(handle.impl->cb); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } + else { + windows::AsynchCallbackRequest *result = + new windows::AsynchCallbackRequest(handle.impl->cb, + handle.impl->cbRequest); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } + } +} + +// All no-ops... +void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {} +void Poller::registerHandle(PollerHandle& /*handle*/) {} +void Poller::unregisterHandle(PollerHandle& /*handle*/) {} + +Poller::Event Poller::wait(Duration timeout) { + DWORD timeoutMs = 0; + DWORD numTransferred = 0; + ULONG_PTR completionKey = 0; + OVERLAPPED *overlapped = 0; + windows::AsynchResult *result = 0; + + // Wait for either an I/O operation to finish (thus signaling the + // IOCP handle) or a shutdown request to be made (thus signaling the + // shutdown event). + if (timeout == TIME_INFINITE) + timeoutMs = INFINITE; + else + timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC); + + InterlockedIncrement(&impl->threadsRunning); + bool goodOp = ::GetQueuedCompletionStatus (impl->iocp, + &numTransferred, + &completionKey, + &overlapped, + timeoutMs); + LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning); + if (goodOp) { + // Dequeued a successful completion. If it's a posted packet from + // shutdown() the overlapped ptr is 0 and key is 1. Else downcast + // the OVERLAPPED pointer to an AsynchIoResult and call the + // completion handler. + if (overlapped == 0 && completionKey == 1) { + // If there are other threads still running this wait, re-post + // the completion. + if (remainingThreads > 0) + PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0); + return Event(0, SHUTDOWN); + } + + result = windows::AsynchResult::from_overlapped(overlapped); + result->success (static_cast<size_t>(numTransferred)); + } + else { + if (overlapped != 0) { + // Dequeued a completion for a failed operation. Downcast back + // to the result object and inform it that the operation failed. + DWORD status = ::GetLastError(); + result = windows::AsynchResult::from_overlapped(overlapped); + result->failure (static_cast<int>(status)); + } + } + return Event(0, INVALID); // TODO - this may need to be changed. + +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/LockFile.cpp b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp new file mode 100755 index 0000000000..048c2d5b18 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp @@ -0,0 +1,64 @@ +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/LockFile.h" +#include "qpid/sys/windows/check.h" + +#include <windows.h> + +namespace qpid { +namespace sys { + +class LockFilePrivate { + friend class LockFile; + + HANDLE fd; + +public: + LockFilePrivate(HANDLE f) : fd(f) {} +}; + +LockFile::LockFile(const std::string& path_, bool create) + : path(path_), created(create) { + + HANDLE h = ::CreateFile(path.c_str(), + create ? (GENERIC_READ|GENERIC_WRITE) : GENERIC_READ, + FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, + 0, /* Default security */ + create ? OPEN_ALWAYS : OPEN_EXISTING, + FILE_FLAG_DELETE_ON_CLOSE, /* Delete file when closed */ + NULL); + if (h == INVALID_HANDLE_VALUE) + throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError())); + + // Lock up to 4Gb + if (!::LockFile(h, 0, 0, 0xffffffff, 0)) + throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError())); + impl.reset(new LockFilePrivate(h)); +} + +LockFile::~LockFile() { + if (impl) { + if (impl->fd != INVALID_HANDLE_VALUE) { + ::UnlockFile(impl->fd, 0, 0, 0xffffffff, 0); + ::CloseHandle(impl->fd); + } + } +} + +}} /* namespace qpid::sys */ diff --git a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp new file mode 100755 index 0000000000..062458ae5f --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp @@ -0,0 +1,101 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/sys/PipeHandle.h" +#include "qpid/sys/windows/check.h" +#include <winsock2.h> + +namespace qpid { +namespace sys { + +PipeHandle::PipeHandle(bool nonBlocking) { + + SOCKET listener, pair[2]; + struct sockaddr_in addr; + int err; + int addrlen = sizeof(addr); + pair[0] = pair[1] = INVALID_SOCKET; + if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + err = bind(listener, (const struct sockaddr*) &addr, sizeof(addr)); + if (err == SOCKET_ERROR) { + err = WSAGetLastError(); + closesocket(listener); + throw QPID_WINDOWS_ERROR(err); + } + + err = getsockname(listener, (struct sockaddr*) &addr, &addrlen); + if (err == SOCKET_ERROR) { + err = WSAGetLastError(); + closesocket(listener); + throw QPID_WINDOWS_ERROR(err); + } + + try { + if (listen(listener, 1) == SOCKET_ERROR) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if ((pair[0] = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if (connect(pair[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if ((pair[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + + closesocket(listener); + writeFd = pair[0]; + readFd = pair[1]; + } + catch (...) { + closesocket(listener); + if (pair[0] != INVALID_SOCKET) + closesocket(pair[0]); + throw; + } + + // Set the socket to non-blocking + if (nonBlocking) { + unsigned long nonblock = 1; + ioctlsocket(readFd, FIONBIO, &nonblock); + } +} + +PipeHandle::~PipeHandle() { + closesocket(readFd); + closesocket(writeFd); +} + +int PipeHandle::read(void* buf, size_t bufSize) { + return ::recv(readFd, (char *)buf, bufSize, 0); +} + +int PipeHandle::write(const void* buf, size_t bufSize) { + return ::send(writeFd, (const char *)buf, bufSize, 0); +} + +int PipeHandle::getReadHandle() { + return readFd; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp new file mode 100644 index 0000000000..6a1d9045b4 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -0,0 +1,114 @@ +#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP +#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/sys/windows/IoHandlePrivate.h" + +#include <boost/bind.hpp> +#include <windows.h> + +namespace qpid { +namespace sys { + +// PollableConditionPrivate will reuse the IocpPoller's ability to queue +// a completion to the IOCP and have it dispatched to the completer callback +// noted in the IOHandlePrivate when the request is queued. The +// AsynchCallbackRequest object is not really used - we already have the +// desired callback for the user of PollableCondition. +class PollableConditionPrivate : private IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller); + ~PollableConditionPrivate(); + + void poke(); + void dispatch(windows::AsynchIoResult *result); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr<sys::Poller> poller; + LONG isSet; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET, + boost::bind(&PollableConditionPrivate::dispatch, this, _1))), + cb(cb), parent(parent), poller(poller), isSet(0) +{ +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ +} + +void PollableConditionPrivate::poke() +{ + // monitorHandle will queue a completion for the IOCP; when it's handled, a + // poller thread will call back to dispatch() below. + PollerHandle ph(*this); + poller->monitorHandle(ph, Poller::INPUT); +} + +void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result) +{ + delete result; // Poller::monitorHandle() allocates this + cb(parent); + if (isSet) + poke(); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + // Add one to the set count and poke it to provoke a callback + ::InterlockedIncrement(&impl->isSet); + impl->poke(); +} + +void PollableCondition::clear() { + ::InterlockedExchange(&impl->isSet, 0); +} + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/ diff --git a/qpid/cpp/src/qpid/sys/windows/Shlib.cpp b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp new file mode 100644 index 0000000000..ba18747eb4 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Shlib.h" +#include "qpid/Exception.h" +#include "qpid/sys/windows/check.h" +#include <windows.h> + +namespace qpid { +namespace sys { + +void Shlib::load(const char* name) { + HMODULE h = LoadLibrary(name); + if (h == NULL) { + throw QPID_WINDOWS_ERROR(GetLastError()); + } + handle = static_cast<void*>(h); +} + +void Shlib::unload() { + if (handle) { + if (FreeLibrary(static_cast<HMODULE>(handle)) == 0) { + throw QPID_WINDOWS_ERROR(GetLastError()); + } + handle = 0; + } +} + +void* Shlib::getSymbol(const char* name) { + // Double cast avoids warning about casting function pointer to object + void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name))); + if (sym == NULL) + throw QPID_WINDOWS_ERROR(GetLastError()); + return sym; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp new file mode 100755 index 0000000000..baa80f04e0 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/Time.h" + +#include <cstdlib> +#include <string.h> + +#include <winsock2.h> + +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +// Need to initialize WinSock. Ideally, this would be a singleton or embedded +// in some one-time initialization function. I tried boost singleton and could +// not get it to compile (and others located in google had the same problem). +// So, this simple static with an interlocked increment will do for known +// use cases at this time. Since this will only shut down winsock at process +// termination, there may be some problems with client programs that also +// expect to load and unload winsock, but we'll see... +// If someone does get an easy-to-use singleton sometime, converting to it +// may be preferable. + +namespace { + +static LONG volatile initialized = 0; + +class WinSockSetup { + // : public boost::details::pool::singleton_default<WinSockSetup> { + +public: + WinSockSetup() { + LONG timesEntered = InterlockedIncrement(&initialized); + if (timesEntered > 1) + return; + err = 0; + WORD wVersionRequested; + WSADATA wsaData; + + /* Request WinSock 2.2 */ + wVersionRequested = MAKEWORD(2, 2); + err = WSAStartup(wVersionRequested, &wsaData); + } + + ~WinSockSetup() { + WSACleanup(); + } + +public: + int error(void) const { return err; } + +protected: + DWORD err; +}; + +static WinSockSetup setup; + +} /* namespace */ + +namespace qpid { +namespace sys { + +namespace { + +std::string getName(SOCKET fd, bool local) +{ + sockaddr_in name; // big enough for any socket address + socklen_t namelen = sizeof(name); + if (local) { + QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen)); + } else { + QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen)); + } + + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc = ::getnameinfo((sockaddr*)&name, namelen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + return std::string(dispName) + ":" + std::string(servName); +} +} // namespace + +Socket::Socket() : + IOHandle(new IOHandlePrivate), + nonblocking(false), + nodelay(false) +{ + SOCKET& socket = impl->fd; + if (socket != INVALID_SOCKET) Socket::close(); + SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; +} + +Socket::Socket(IOHandlePrivate* h) : + IOHandle(h), + nonblocking(false), + nodelay(false) +{} + +void +Socket::createSocket(const SocketAddress& sa) const +{ + SOCKET& socket = impl->fd; + if (socket != INVALID_SOCKET) Socket::close(); + + SOCKET s = ::socket (getAddrInfo(sa).ai_family, + getAddrInfo(sa).ai_socktype, + 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; + + try { + if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); + } catch (std::exception&) { + closesocket(s); + socket = INVALID_SOCKET; + throw; + } +} + +void Socket::setNonblocking() const { + u_long nonblock = 1; + QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock)); +} + +void Socket::connect(const std::string& host, const std::string& port) const +{ + SocketAddress sa(host, port); + connect(sa); +} + +void +Socket::connect(const SocketAddress& addr) const +{ + peername = addr.asString(false); + + const SOCKET& socket = impl->fd; + const addrinfo *addrs = &(getAddrInfo(addr)); + int error = 0; + WSASetLastError(0); + while (addrs != 0) { + if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) || + (WSAGetLastError() == WSAEWOULDBLOCK)) + break; + // Error... save this error code and see if there are other address + // to try before throwing the exception. + error = WSAGetLastError(); + addrs = addrs->ai_next; + } + if (error) + throw qpid::Exception(QPID_MSG(strError(error) << ": " << peername)); +} + +void +Socket::close() const +{ + SOCKET& socket = impl->fd; + if (socket == INVALID_SOCKET) return; + QPID_WINSOCK_CHECK(closesocket(socket)); + socket = INVALID_SOCKET; +} + + +int Socket::write(const void *buf, size_t count) const +{ + const SOCKET& socket = impl->fd; + int sent = ::send(socket, (const char *)buf, count, 0); + if (sent == SOCKET_ERROR) + return -1; + return sent; +} + +int Socket::read(void *buf, size_t count) const +{ + const SOCKET& socket = impl->fd; + int received = ::recv(socket, (char *)buf, count, 0); + if (received == SOCKET_ERROR) + return -1; + return received; +} + +int Socket::listen(const std::string&, const std::string& port, int backlog) const +{ + const SOCKET& socket = impl->fd; + BOOL yes=1; + QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes))); + struct sockaddr_in name; + memset(&name, 0, sizeof(name)); + name.sin_family = AF_INET; + name.sin_port = htons(boost::lexical_cast<uint16_t>(port)); + name.sin_addr.s_addr = 0; + if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError()))); + if (::listen(socket, backlog) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError()))); + + socklen_t namelen = sizeof(name); + QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen)); + return ntohs(name.sin_port); +} + +Socket* Socket::accept() const +{ + SOCKET afd = ::accept(impl->fd, 0, 0); + if (afd != INVALID_SOCKET) + return new Socket(new IOHandlePrivate(afd)); + else if (WSAGetLastError() == EAGAIN) + return 0; + else throw QPID_WINDOWS_ERROR(WSAGetLastError()); +} + +std::string Socket::getPeerAddress() const +{ + if (peername.empty()) + peername = getName(impl->fd, false); + return peername; +} + +std::string Socket::getLocalAddress() const +{ + if (localname.empty()) + localname = getName(impl->fd, true); + return localname; +} + +int Socket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); + return result; +} + +void Socket::setTcpNoDelay() const +{ + int flag = 1; + int result = setsockopt(impl->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + nodelay = true; +} + +inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h) +{ + return h.impl; +} + +SOCKET toSocketHandle(const Socket& s) +{ + return IOHandlePrivate::getImpl(s)->fd; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp new file mode 100644 index 0000000000..ac43cd2d23 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/sys/windows/check.h" + +#include <winsock2.h> +#include <ws2tcpip.h> +#include <string.h> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0) +{ + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = host.c_str(); + } + const char* service = port.empty() ? "0" : port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +} + +SocketAddress::~SocketAddress() +{ + ::freeaddrinfo(addrInfo); +} + +std::string SocketAddress::asString(bool) const +{ + return host + ":" + port; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + return *sa.addrInfo; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp new file mode 100644 index 0000000000..11a3389e45 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -0,0 +1,661 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SslAsynchIO.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +#include "qpid/sys/windows/check.h" + +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + +#include <queue> +#include <boost/bind.hpp> + +namespace { + + /* + * To make the SSL encryption more efficient, set up a new BufferBase + * that leaves room for the SSL header to be prepended and the SSL + * trailer to be appended. + * + * This works by accepting a properly formed BufferBase, remembering it, + * and resetting the members of this struct to reflect the reserved + * header and trailer areas. It's only needed for giving buffers up to + * the frame layer for writing into. + */ + struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase { + std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff; + + SslIoBuff (qpid::sys::AsynchIO::BufferBase *base, + const SecPkgContext_StreamSizes &sizes) + : qpid::sys::AsynchIO::BufferBase(&base->bytes[sizes.cbHeader], + std::min(base->byteCount - sizes.cbHeader - sizes.cbTrailer, + sizes.cbMaximumMessage)), + aioBuff(base) + {} + + ~SslIoBuff() {} + qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); } + }; +} + +namespace qpid { +namespace sys { +namespace windows { + +SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb, + NegotiateDoneCallback nCb) : + credHandle(hCred), + aio(0), + state(Negotiating), + readCallback(rCb), + idleCallback(iCb), + negotiateDoneCallback(nCb), + callbacksInProgress(0), + queuedDelete(false), + leftoverPlaintext(0) +{ + SecInvalidateHandle(&ctxtHandle); + peerAddress = s.getPeerAddress(); + aio = qpid::sys::AsynchIO::create(s, + boost::bind(&SslAsynchIO::sslDataIn, this, _1, _2), + eofCb, + disCb, + cCb, + eCb, + boost::bind(&SslAsynchIO::idle, this, _1)); +} + +SslAsynchIO::~SslAsynchIO() { + if (leftoverPlaintext) { + delete leftoverPlaintext; + leftoverPlaintext = 0; + } +} + +void SslAsynchIO::queueForDeletion() { + // This method effectively disconnects the layer above; pass it on the + // AsynchIO and delete. + aio->queueForDeletion(); + queuedDelete = true; + if (!callbacksInProgress) + delete this; +} + +void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) { + aio->start(poller); + startNegotiate(); +} + +void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { + aio->queueReadBuffer(buff); +} + +void SslAsynchIO::unread(AsynchIO::BufferBase* buff) { + // This is plaintext data being given back for more. Since it's already + // decrypted, don't give it back to the aio layer; keep it to append + // any new data for the upper layer. + assert(buff); + buff->squish(); + assert(leftoverPlaintext == 0); + leftoverPlaintext = buff; +} + +void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) { + // @@TODO: Need to delay the write if the session is renegotiating. + + // Should not have gotten here without an SslIoBuff. This assert is + // primarily to catch any stray cases where write is called with a buffer + // not obtained via getQueuedBuffer. + SslIoBuff *sslBuff = dynamic_cast<SslIoBuff*>(buff); + assert(sslBuff != 0); + + // Encrypt and hand off to the io layer. Remember that the upper layer's + // encoding was working on, and adjusting counts for, the SslIoBuff. + // Update the count of the original BufferBase before handing off to + // the I/O layer. + buff = sslBuff->release(); + SecBuffer buffs[4]; + buffs[0].cbBuffer = schSizes.cbHeader; + buffs[0].BufferType = SECBUFFER_STREAM_HEADER; + buffs[0].pvBuffer = buff->bytes; // This space was left by SslIoBuff + buffs[1].cbBuffer = sslBuff->dataCount; + buffs[1].BufferType = SECBUFFER_DATA; + buffs[1].pvBuffer = sslBuff->bytes; + buffs[2].cbBuffer = schSizes.cbTrailer; + buffs[2].BufferType = SECBUFFER_STREAM_TRAILER; + buffs[2].pvBuffer = &sslBuff->bytes[sslBuff->dataCount]; + buffs[3].cbBuffer = 0; + buffs[3].BufferType = SECBUFFER_EMPTY; + buffs[3].pvBuffer = 0; + SecBufferDesc buffDesc; + buffDesc.ulVersion = SECBUFFER_VERSION; + buffDesc.cBuffers = 4; + buffDesc.pBuffers = buffs; + SECURITY_STATUS status = ::EncryptMessage(&ctxtHandle, 0, &buffDesc, 0); + + // EncryptMessage encrypts the data in place. The header and trailer + // areas were left previously and must now be included in the updated + // count of bytes to write to the peer. + delete sslBuff; + buff->dataCount = buffs[0].cbBuffer + buffs[1].cbBuffer + buffs[2].cbBuffer; + aio->queueWrite(buff); +} + +void SslAsynchIO::notifyPendingWrite() { + aio->notifyPendingWrite(); +} + +void SslAsynchIO::queueWriteClose() { + if (state == Negotiating) { + // Never got going, so don't bother trying to close SSL down orderly. + state = ShuttingDown; + aio->queueWriteClose(); + return; + } + + state = ShuttingDown; + + DWORD shutdown = SCHANNEL_SHUTDOWN; + SecBuffer shutBuff; + shutBuff.cbBuffer = sizeof(DWORD); + shutBuff.BufferType = SECBUFFER_TOKEN; + shutBuff.pvBuffer = &shutdown; + SecBufferDesc desc; + desc.ulVersion = SECBUFFER_VERSION; + desc.cBuffers = 1; + desc.pBuffers = &shutBuff; + ::ApplyControlToken(&ctxtHandle, &desc); + negotiateStep(0); + // When the shutdown sequence is done, negotiateDone() will handle + // shutting down aio. +} + +bool SslAsynchIO::writeQueueEmpty() { + return aio->writeQueueEmpty(); +} + +/* + * Initiate a read operation. AsynchIO::readComplete() will be + * called when the read is complete and data is available. + */ +void SslAsynchIO::startReading() { + aio->startReading(); +} + +void SslAsynchIO::stopReading() { + aio->stopReading(); +} + +// Queue the specified callback for invocation from an I/O thread. +void SslAsynchIO::requestCallback(RequestCallback callback) { + aio->requestCallback(callback); +} + +/** + * Return a queued buffer read to put new data in for writing. + * This method ALWAYS returns a SslIoBuff reflecting a BufferBase from + * the aio layer that has header and trailer space reserved. + */ +AsynchIO::BufferBase* SslAsynchIO::getQueuedBuffer() { + SslIoBuff *sslBuff = 0; + BufferBase* buff = aio->getQueuedBuffer(); + if (buff == 0) + return 0; + + sslBuff = new SslIoBuff(buff, schSizes); + return sslBuff; +} + +unsigned int SslAsynchIO::getSslKeySize() { + SecPkgContext_KeyInfo info; + memset(&info, 0, sizeof(info)); + ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_KEY_INFO, &info); + return info.KeySize; +} + +void SslAsynchIO::negotiationDone() { + switch(state) { + case Negotiating: + ::QueryContextAttributes(&ctxtHandle, + SECPKG_ATTR_STREAM_SIZES, + &schSizes); + state = Running; + if (negotiateDoneCallback) + negotiateDoneCallback(SEC_E_OK); + break; + case Redo: + state = Running; + break; + case ShuttingDown: + aio->queueWriteClose(); + break; + default: + assert(0); + } +} + +void SslAsynchIO::negotiationFailed(SECURITY_STATUS status) { + QPID_LOG(notice, "SSL negotiation failed to " << peerAddress << ": " << + qpid::sys::strError(status)); + if (negotiateDoneCallback) + negotiateDoneCallback(status); + else + queueWriteClose(); +} + +void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) { + if (state != Running) { + negotiateStep(buff); + return; + } + + // Decrypt the buffer; if there's legit data, pass it on through. + // However, it's also possible that the peer hasn't supplied enough + // data yet, or the session needs to be renegotiated, or the session + // is ending. + SecBuffer recvBuffs[4]; + recvBuffs[0].cbBuffer = buff->dataCount; + recvBuffs[0].BufferType = SECBUFFER_DATA; + recvBuffs[0].pvBuffer = &buff->bytes[buff->dataStart]; + recvBuffs[1].BufferType = SECBUFFER_EMPTY; + recvBuffs[2].BufferType = SECBUFFER_EMPTY; + recvBuffs[3].BufferType = SECBUFFER_EMPTY; + SecBufferDesc buffDesc; + buffDesc.ulVersion = SECBUFFER_VERSION; + buffDesc.cBuffers = 4; + buffDesc.pBuffers = recvBuffs; + SECURITY_STATUS status = ::DecryptMessage(&ctxtHandle, &buffDesc, 0, NULL); + if (status != SEC_E_OK) { + if (status == SEC_E_INCOMPLETE_MESSAGE) { + // Give the partially filled buffer back and get more data + a.unread(buff); + } + else { + // Don't need this any more... + a.queueReadBuffer(buff); + + if (status == SEC_I_RENEGOTIATE) { + state = Redo; + negotiateStep(0); + } + else if (status == SEC_I_CONTEXT_EXPIRED) { + queueWriteClose(); + } + else { + throw QPID_WINDOWS_ERROR(status); + } + } + return; + } + + // All decrypted and verified... continue with AMQP. The recvBuffs have + // been set up by DecryptMessage to demarcate the SSL header, data, and + // trailer, as well as any extra data left over. Walk through and find + // that info, adjusting the buff data accordingly to reflect only the + // actual decrypted data. + // If there's extra data, copy that out to a new buffer and run through + // this method again. + BufferBase *extraBuff = 0; + for (int i = 0; i < 4; i++) { + switch (recvBuffs[i].BufferType) { + case SECBUFFER_STREAM_HEADER: + buff->dataStart += recvBuffs[i].cbBuffer; + // Fall through - also don't count these bytes as data + case SECBUFFER_STREAM_TRAILER: + buff->dataCount -= recvBuffs[i].cbBuffer; + break; + case SECBUFFER_EXTRA: + // Very important to get this buffer from the downstream aio. + // The ones constructed from the local getQueuedBuffer() are + // restricted size for encrypting. However, data coming up from + // TCP may have a bunch of SSL segments coalesced and be much + // larger than the maximum single SSL segment. + extraBuff = a.getQueuedBuffer(); + if (0 == extraBuff) + throw QPID_WINDOWS_ERROR(WSAENOBUFS); + memmove(extraBuff->bytes, + recvBuffs[i].pvBuffer, + recvBuffs[i].cbBuffer); + extraBuff->dataCount = recvBuffs[i].cbBuffer; + break; + default: + break; + } + } + + // Since we've already taken (possibly) all the available bytes from the + // aio layer, need to be sure that everything that's processable is + // processed before returning back to aio. It could be that any + // leftoverPlaintext data plus new buff data won't fit in one buffer, so + // need to keep going around the input processing loop until either + // all the bytes are gone, or there's less than a full frame remaining + // (so we can count on more bytes being on the way via aio). + do { + BufferBase *temp = 0; + // Now that buff reflects only decrypted data, see if there was any + // partial data left over from last time. If so, append this new + // data to that and release the current buff back to aio. Assume that + // leftoverPlaintext was squished so the data starts at 0. + if (leftoverPlaintext != 0) { + // There is leftover data; append all the new data that will fit. + int32_t count = buff->dataCount; + if (leftoverPlaintext->dataCount + count > leftoverPlaintext->byteCount) + count = (leftoverPlaintext->byteCount - leftoverPlaintext->dataCount); + ::memmove(&leftoverPlaintext->bytes[leftoverPlaintext->dataCount], + &buff->bytes[buff->dataStart], + count); + leftoverPlaintext->dataCount += count; + buff->dataCount -= count; + buff->dataStart += count; + if (buff->dataCount == 0) { + a.queueReadBuffer(buff); + buff = 0; + } + // Prepare to pass the buffer up. Beware that the read callback + // may do an unread(), so move the leftoverPlaintext pointer + // out of the way. It also may release the buffer back to aio, + // so in either event, the pointer passed to the callback is not + // valid on return. + temp = leftoverPlaintext; + leftoverPlaintext = 0; + } + else { + temp = buff; + buff = 0; + } + if (readCallback) { + // The callback guard here is to prevent an upcall from deleting + // this out from under us via queueForDeletion(). + ++callbacksInProgress; + readCallback(*this, temp); + --callbacksInProgress; + } + else + a.queueReadBuffer(temp); // What else can we do with this??? + } while (buff != 0); + + // Ok, the current decrypted data is done. If there was any extra data, + // go back and handle that. + if (extraBuff != 0) + sslDataIn(a, extraBuff); + + // If the upper layer queued for delete, do that now that all the + // callbacks are done. + if (queuedDelete && callbacksInProgress == 0) + delete this; +} + +void SslAsynchIO::idle(qpid::sys::AsynchIO&) { + // Don't relay idle indication to layer above until SSL session is up. + if (state == Running) { + state = Running; + if (idleCallback) + idleCallback(*this); + } +} + + /**************************************************/ + +ClientSslAsynchIO::ClientSslAsynchIO(const std::string& brokerHost, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb, + NegotiateDoneCallback nCb) : + SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb), + serverHost(brokerHost) +{ +} + +void ClientSslAsynchIO::startNegotiate() { + // SEC_CHAR is non-const, so do all the typing here. + SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str()); + + // Need a buffer to receive the token to send to the server. + BufferBase *buff = aio->getQueuedBuffer(); + ULONG ctxtRequested = ISC_REQ_STREAM; + ULONG ctxtAttrs; + // sendBuffs gets information to forward to the peer. + SecBuffer sendBuffs[2]; + sendBuffs[0].cbBuffer = buff->byteCount; + sendBuffs[0].BufferType = SECBUFFER_TOKEN; + sendBuffs[0].pvBuffer = buff->bytes; + sendBuffs[1].cbBuffer = 0; + sendBuffs[1].BufferType = SECBUFFER_EMPTY; + sendBuffs[1].pvBuffer = 0; + SecBufferDesc sendBuffDesc; + sendBuffDesc.ulVersion = SECBUFFER_VERSION; + sendBuffDesc.cBuffers = 2; + sendBuffDesc.pBuffers = sendBuffs; + SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle, + NULL, + host, + ctxtRequested, + 0, + 0, + NULL, + 0, + &ctxtHandle, + &sendBuffDesc, + &ctxtAttrs, + NULL); + if (status == SEC_I_CONTINUE_NEEDED) { + buff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(buff); + } +} + +void ClientSslAsynchIO::negotiateStep(BufferBase* buff) { + // SEC_CHAR is non-const, so do all the typing here. + SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str()); + ULONG ctxtRequested = ISC_REQ_STREAM; + ULONG ctxtAttrs; + + // tokenBuffs describe the buffer that's coming in. It should have + // a token from the SSL server. + SecBuffer tokenBuffs[2]; + tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0; + tokenBuffs[0].BufferType = SECBUFFER_TOKEN; + tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0; + tokenBuffs[1].cbBuffer = 0; + tokenBuffs[1].BufferType = SECBUFFER_EMPTY; + tokenBuffs[1].pvBuffer = 0; + SecBufferDesc tokenBuffDesc; + tokenBuffDesc.ulVersion = SECBUFFER_VERSION; + tokenBuffDesc.cBuffers = 2; + tokenBuffDesc.pBuffers = tokenBuffs; + + // Need a buffer to receive any token to send back to the server. + BufferBase *sendbuff = aio->getQueuedBuffer(); + // sendBuffs gets information to forward to the peer. + SecBuffer sendBuffs[2]; + sendBuffs[0].cbBuffer = sendbuff->byteCount; + sendBuffs[0].BufferType = SECBUFFER_TOKEN; + sendBuffs[0].pvBuffer = sendbuff->bytes; + sendBuffs[1].cbBuffer = 0; + sendBuffs[1].BufferType = SECBUFFER_EMPTY; + sendBuffs[1].pvBuffer = 0; + SecBufferDesc sendBuffDesc; + sendBuffDesc.ulVersion = SECBUFFER_VERSION; + sendBuffDesc.cBuffers = 2; + sendBuffDesc.pBuffers = sendBuffs; + + SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle, + &ctxtHandle, + host, + ctxtRequested, + 0, + 0, + &tokenBuffDesc, + 0, + NULL, + &sendBuffDesc, + &ctxtAttrs, + NULL); + + if (status == SEC_E_INCOMPLETE_MESSAGE) { + // Not enough - get more data from the server then try again. + aio->unread(buff); + aio->queueReadBuffer(sendbuff); // Don't need this one for now... + return; + } + // Done with the buffer that came in... + if (buff) + aio->queueReadBuffer(buff); + if (status == SEC_I_CONTINUE_NEEDED) { + sendbuff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(sendbuff); + return; + } + // Nothing to send back to the server... + aio->queueReadBuffer(sendbuff); + // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be + // either session stop or negotiation done (session up). + if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED) + negotiationDone(); + else + negotiationFailed(status); +} + +/*************************************************/ + +ServerSslAsynchIO::ServerSslAsynchIO(bool clientMustAuthenticate, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb, + NegotiateDoneCallback nCb) : + SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb), + clientAuth(clientMustAuthenticate) +{ +} + +void ServerSslAsynchIO::startNegotiate() { + // Nothing... need the client to send a token first. +} + +void ServerSslAsynchIO::negotiateStep(BufferBase* buff) { + ULONG ctxtRequested = ASC_REQ_STREAM; + if (clientAuth) + ctxtRequested |= ASC_REQ_MUTUAL_AUTH; + ULONG ctxtAttrs; + + // tokenBuffs describe the buffer that's coming in. It should have + // a token from the SSL server except if shutting down or renegotiating. + SecBuffer tokenBuffs[2]; + tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0; + tokenBuffs[0].BufferType = SECBUFFER_TOKEN; + tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0; + tokenBuffs[1].cbBuffer = 0; + tokenBuffs[1].BufferType = SECBUFFER_EMPTY; + tokenBuffs[1].pvBuffer = 0; + SecBufferDesc tokenBuffDesc; + tokenBuffDesc.ulVersion = SECBUFFER_VERSION; + tokenBuffDesc.cBuffers = 2; + tokenBuffDesc.pBuffers = tokenBuffs; + + // Need a buffer to receive any token to send back to the server. + BufferBase *sendbuff = aio->getQueuedBuffer(); + // sendBuffs gets information to forward to the peer. + SecBuffer sendBuffs[2]; + sendBuffs[0].cbBuffer = sendbuff->byteCount; + sendBuffs[0].BufferType = SECBUFFER_TOKEN; + sendBuffs[0].pvBuffer = sendbuff->bytes; + sendBuffs[1].cbBuffer = 0; + sendBuffs[1].BufferType = SECBUFFER_EMPTY; + sendBuffs[1].pvBuffer = 0; + SecBufferDesc sendBuffDesc; + sendBuffDesc.ulVersion = SECBUFFER_VERSION; + sendBuffDesc.cBuffers = 2; + sendBuffDesc.pBuffers = sendBuffs; + PCtxtHandle ctxtHandlePtr = (SecIsValidHandle(&ctxtHandle)) ? &ctxtHandle : 0; + SECURITY_STATUS status = ::AcceptSecurityContext(&credHandle, + ctxtHandlePtr, + &tokenBuffDesc, + ctxtRequested, + 0, + &ctxtHandle, + &sendBuffDesc, + &ctxtAttrs, + NULL); + if (status == SEC_E_INCOMPLETE_MESSAGE) { + // Not enough - get more data from the server then try again. + if (buff) + aio->unread(buff); + aio->queueReadBuffer(sendbuff); // Don't need this one for now... + return; + } + // Done with the buffer that came in... + if (buff) + aio->queueReadBuffer(buff); + if (status == SEC_I_CONTINUE_NEEDED) { + sendbuff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(sendbuff); + return; + } + // There may have been a token generated; if so, send it to the client. + if (sendBuffs[0].cbBuffer > 0) { + sendbuff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(sendbuff); + } + else + // Nothing to send back to the server... + aio->queueReadBuffer(sendbuff); + + // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be + // either session stop or negotiation done (session up). + if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED) { + if (clientAuth) + QPID_LOG(warning, "DID WE CHECK FOR CLIENT AUTH???"); + + negotiationDone(); + } + else { + negotiationFailed(status); + } +} + +}}} // namespace qpid::sys::windows diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h new file mode 100644 index 0000000000..3cdf2c8f08 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -0,0 +1,191 @@ +#ifndef _sys_windows_SslAsynchIO +#define _sys_windows_SslAsynchIO + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Poller.h" +#include "qpid/CommonImportExport.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <windows.h> +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + +namespace qpid { +namespace sys { +namespace windows { + +class Socket; +class Poller; + +/* + * SSL/Schannel shim between the frame-handling and AsynchIO layers. + * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class + * gets involved for SSL negotiations and encrypt/decrypt. The details of + * how this all works are invisible to the layers on either side. The only + * change from normal AsynchIO usage is that there's an extra callback + * from SslAsynchIO to indicate that the initial session negotiation is + * complete. + * + * The details of session negotiation are different for client and server + * SSL roles. These differences are handled by deriving separate client + * and server role classes. + */ +class SslAsynchIO : public qpid::sys::AsynchIO { +public: + typedef boost::function1<void, SECURITY_STATUS> NegotiateDoneCallback; + + SslAsynchIO(const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0, + NegotiateDoneCallback nCb = 0); + ~SslAsynchIO(); + + virtual void queueForDeletion(); + + virtual void start(qpid::sys::Poller::shared_ptr poller); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void startReading(); + virtual void stopReading(); + virtual void requestCallback(RequestCallback); + virtual BufferBase* getQueuedBuffer(); + + QPID_COMMON_EXTERN unsigned int getSslKeySize(); + +protected: + CredHandle credHandle; + + // AsynchIO layer below that's actually doing the I/O + qpid::sys::AsynchIO *aio; + + // Track what the state of the SSL session is. Have to know when it's + // time to notify the upper layer that the session is up, and also to + // know when it's not legit to pass data through to either side. + enum { Negotiating, Running, Redo, ShuttingDown } state; + bool sessionUp; + CtxtHandle ctxtHandle; + TimeStamp credExpiry; + + // Client- and server-side SSL subclasses implement these to do the + // proper negotiation steps. negotiateStep() is called with a buffer + // just received from the peer. + virtual void startNegotiate() = 0; + virtual void negotiateStep(BufferBase *buff) = 0; + + // The negotiating steps call one of these when it's finalized: + void negotiationDone(); + void negotiationFailed(SECURITY_STATUS status); + +private: + // These are callbacks from AsynchIO to here. + void sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff); + void idle(qpid::sys::AsynchIO&); + + // These callbacks are to the layer above. + ReadCallback readCallback; + IdleCallback idleCallback; + NegotiateDoneCallback negotiateDoneCallback; + volatile unsigned int callbacksInProgress; // >0 if w/in callbacks + volatile bool queuedDelete; + + // Address of peer, in case it's needed for logging. + std::string peerAddress; + + // Partial buffer of decrypted plaintext given back by the layer above. + AsynchIO::BufferBase *leftoverPlaintext; + + SecPkgContext_StreamSizes schSizes; +}; + +/* + * SSL/Schannel client-side shim between the frame-handling and AsynchIO + * layers. + */ +class ClientSslAsynchIO : public SslAsynchIO { +public: + // Args same as for SslIoShim, with the addition of brokerHost which is + // the expected SSL name of the server. + QPID_COMMON_EXTERN ClientSslAsynchIO(const std::string& brokerHost, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0, + NegotiateDoneCallback nCb = 0); + +private: + std::string serverHost; + + // Client- and server-side SSL subclasses implement these to do the + // proper negotiation steps. negotiateStep() is called with a buffer + // just received from the peer. + void startNegotiate(); + void negotiateStep(BufferBase *buff); +}; +/* + * SSL/Schannel server-side shim between the frame-handling and AsynchIO + * layers. + */ +class ServerSslAsynchIO : public SslAsynchIO { +public: + QPID_COMMON_EXTERN ServerSslAsynchIO(bool clientMustAuthenticate, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0, + NegotiateDoneCallback nCb = 0); + +private: + bool clientAuth; + + // Client- and server-side SSL subclasses implement these to do the + // proper negotiation steps. negotiateStep() is called with a buffer + // just received from the peer. + void startNegotiate(); + void negotiateStep(BufferBase *buff); +}; + +}}} // namespace qpid::sys::windows + +#endif // _sys_windows_SslAsynchIO diff --git a/qpid/cpp/src/qpid/sys/windows/StrError.cpp b/qpid/cpp/src/qpid/sys/windows/StrError.cpp new file mode 100755 index 0000000000..546d399d16 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/StrError.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/StrError.h" +#include <string> +#include <string.h> +#include <windows.h> + +namespace qpid { +namespace sys { + +std::string strError(int err) { + const size_t bufsize = 512; + char buf[bufsize]; + buf[0] = 0; + if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK + | FORMAT_MESSAGE_FROM_SYSTEM, + 0, + err, + 0, // Default language + buf, + bufsize, + 0)) + { +#ifdef _MSC_VER + strerror_s(buf, bufsize, err); +#else + return std::string(strerror(err)); +#endif + } + return std::string(buf); +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp new file mode 100755 index 0000000000..4da440bdd4 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* GetNativeSystemInfo call requires _WIN32_WINNT 0x0501 or higher */ +#ifndef _WIN32_WINNT +# define _WIN32_WINNT 0x0501 +#endif + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/SystemInfo.h" + +#include <winsock2.h> +#include <ws2tcpip.h> +#include <windows.h> +#include <tlhelp32.h> + +#ifndef HOST_NAME_MAX +# define HOST_NAME_MAX 256 +#endif + +namespace qpid { +namespace sys { + +long SystemInfo::concurrency() { + SYSTEM_INFO sys_info; + ::GetSystemInfo (&sys_info); + long activeProcessors = 0; + DWORD_PTR mask = sys_info.dwActiveProcessorMask; + while (mask != 0) { + if (mask & 1) + ++activeProcessors; + mask >>= 1; + } + return activeProcessors; +} + +bool SystemInfo::getLocalHostname (Address &address) { + char name[HOST_NAME_MAX]; + if (::gethostname(name, sizeof(name)) != 0) { + errno = WSAGetLastError(); + return false; + } + address.host = name; + return true; +} + +static const std::string LOCALHOST("127.0.0.1"); +static const std::string TCP("tcp"); + +void SystemInfo::getLocalIpAddresses (uint16_t port, + std::vector<Address> &addrList) { + enum { MAX_URL_INTERFACES = 100 }; + + SOCKET s = socket (PF_INET, SOCK_STREAM, 0); + if (s != INVALID_SOCKET) { + INTERFACE_INFO interfaces[MAX_URL_INTERFACES]; + DWORD filledBytes = 0; + WSAIoctl (s, + SIO_GET_INTERFACE_LIST, + 0, + 0, + interfaces, + sizeof (interfaces), + &filledBytes, + 0, + 0); + unsigned int interfaceCount = filledBytes / sizeof (INTERFACE_INFO); + for (unsigned int i = 0; i < interfaceCount; ++i) { + if (interfaces[i].iiFlags & IFF_UP) { + std::string addr(inet_ntoa(interfaces[i].iiAddress.AddressIn.sin_addr)); + if (addr != LOCALHOST) + addrList.push_back(Address(TCP, addr, port)); + } + } + closesocket (s); + } +} + +void SystemInfo::getSystemId (std::string &osName, + std::string &nodeName, + std::string &release, + std::string &version, + std::string &machine) +{ + osName = "Microsoft Windows"; + + char node[MAX_COMPUTERNAME_LENGTH + 1]; + DWORD nodelen = MAX_COMPUTERNAME_LENGTH + 1; + GetComputerName (node, &nodelen); + nodeName = node; + + OSVERSIONINFOEX vinfo; + vinfo.dwOSVersionInfoSize = sizeof(vinfo); + GetVersionEx ((OSVERSIONINFO *)&vinfo); + + SYSTEM_INFO sinfo; + GetNativeSystemInfo(&sinfo); + + switch(vinfo.dwMajorVersion) { + case 5: + switch(vinfo.dwMinorVersion) { + case 0: + release ="2000"; + break; + case 1: + release = "XP"; + break; + case 2: + if (sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_AMD64 || + sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_IA64) + release = "XP-64"; + else + release = "Server 2003"; + break; + default: + release = "Windows"; + } + break; + case 6: + if (vinfo.wProductType == VER_NT_SERVER) + release = "Server 2008"; + else + release = "Vista"; + break; + default: + release = "Microsoft Windows"; + } + version = vinfo.szCSDVersion; + + switch(sinfo.wProcessorArchitecture) { + case PROCESSOR_ARCHITECTURE_AMD64: + machine = "x86-64"; + break; + case PROCESSOR_ARCHITECTURE_IA64: + machine = "IA64"; + break; + case PROCESSOR_ARCHITECTURE_INTEL: + machine = "x86"; + break; + default: + machine = "unknown"; + break; + } +} + +uint32_t SystemInfo::getProcessId() +{ + return static_cast<uint32_t>(::GetCurrentProcessId()); +} + +uint32_t SystemInfo::getParentProcessId() +{ + // Only want info for the current process, so ask for something specific. + // The module info won't be used here but it keeps the snapshot limited to + // the current process so a search through all processes is not needed. + HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0); + if (snap == INVALID_HANDLE_VALUE) + return 0; + PROCESSENTRY32 entry; + entry.dwSize = sizeof(entry); + if (!Process32First(snap, &entry)) + entry.th32ParentProcessID = 0; + CloseHandle(snap); + return static_cast<uint32_t>(entry.th32ParentProcessID); +} + +std::string SystemInfo::getProcessName() +{ + std::string name; + + // Only want info for the current process, so ask for something specific. + // The module info won't be used here but it keeps the snapshot limited to + // the current process so a search through all processes is not needed. + HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0); + if (snap == INVALID_HANDLE_VALUE) + return name; + PROCESSENTRY32 entry; + entry.dwSize = sizeof(entry); + if (!Process32First(snap, &entry)) + entry.szExeFile[0] = '\0'; + CloseHandle(snap); + name = entry.szExeFile; + return name; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/Thread.cpp b/qpid/cpp/src/qpid/sys/windows/Thread.cpp new file mode 100755 index 0000000000..583a9613a3 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Thread.cpp @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/windows/check.h" + +#include <process.h> +#include <windows.h> + +namespace { +unsigned __stdcall runRunnable(void* p) +{ + static_cast<qpid::sys::Runnable*>(p)->run(); + _endthreadex(0); + return 0; +} +} + +namespace qpid { +namespace sys { + +class ThreadPrivate { + friend class Thread; + + HANDLE threadHandle; + unsigned threadId; + + ThreadPrivate(Runnable* runnable) { + uintptr_t h = _beginthreadex(0, + 0, + runRunnable, + runnable, + 0, + &threadId); + QPID_WINDOWS_CHECK_CRT_NZ(h); + threadHandle = reinterpret_cast<HANDLE>(h); + } + + ThreadPrivate() + : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {} +}; + +Thread::Thread() {} + +Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {} + +Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {} + +Thread::operator bool() { + return impl; +} + +bool Thread::operator==(const Thread& t) const { + return impl->threadId == t.impl->threadId; +} + +bool Thread::operator!=(const Thread& t) const { + return !(*this==t); +} + +void Thread::join() { + if (impl) { + DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE); + QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED); + CloseHandle (impl->threadHandle); + impl->threadHandle = 0; + } +} + +unsigned long Thread::logId() { + return GetCurrentThreadId(); +} + +/* static */ +Thread Thread::current() { + Thread t; + t.impl.reset(new ThreadPrivate()); + return t; +} + +}} /* qpid::sys */ diff --git a/qpid/cpp/src/qpid/sys/windows/Time.cpp b/qpid/cpp/src/qpid/sys/windows/Time.cpp new file mode 100644 index 0000000000..25c50819cd --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Time.cpp @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include <ostream> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/thread/thread_time.hpp> +#include <windows.h> + +using namespace boost::posix_time; + +namespace { + +// High-res timing support. This will display times since program start, +// more or less. Keep track of the start value and the conversion factor to +// seconds. +bool timeInitialized = false; +LARGE_INTEGER start; +double freq = 1.0; + +} + +namespace qpid { +namespace sys { + +AbsTime::AbsTime(const AbsTime& t, const Duration& d) { + if (d == Duration::max()) { + timepoint = ptime(max_date_time); + } + else { + time_duration td = microseconds(d.nanosecs / 1000); + timepoint = t.timepoint + td; + } +} + +AbsTime AbsTime::FarFuture() { + AbsTime ff; + ptime maxd(max_date_time); + ff.timepoint = maxd; + return ff; +} + +AbsTime AbsTime::Epoch() { + AbsTime time_epoch; + time_epoch.timepoint = boost::posix_time::from_time_t(0); + return time_epoch; +} + +AbsTime AbsTime::now() { + AbsTime time_now; + time_now.timepoint = boost::get_system_time(); + return time_now; +} + +Duration::Duration(const AbsTime& start, const AbsTime& finish) { + time_duration d = finish.timepoint - start.timepoint; + nanosecs = d.total_nanoseconds(); +} + +std::ostream& operator<<(std::ostream& o, const Duration& d) { + return o << int64_t(d) << "ns"; +} + +std::ostream& operator<<(std::ostream& o, const AbsTime& t) { + std::string time_string = to_simple_string(t.timepoint); + return o << time_string; +} + + +void sleep(int secs) { + ::Sleep(secs * 1000); +} + +void usleep(uint64_t usecs) { + DWORD msecs = usecs / 1000; + if (msecs == 0) + msecs = 1; + ::Sleep(msecs); +} + +void outputFormattedNow(std::ostream& o) { + ::time_t rawtime; + ::tm timeinfo; + char time_string[100]; + + ::time( &rawtime ); +#ifdef _MSC_VER + ::localtime_s(&timeinfo, &rawtime); +#else + timeinfo = *(::localtime(&rawtime)); +#endif + ::strftime(time_string, 100, + "%Y-%m-%d %H:%M:%S", + &timeinfo); + o << time_string << " "; +} + +void outputHiresNow(std::ostream& o) { + if (!timeInitialized) { + start.QuadPart = 0; + LARGE_INTEGER iFreq; + iFreq.QuadPart = 1; + QueryPerformanceCounter(&start); + QueryPerformanceFrequency(&iFreq); + freq = static_cast<double>(iFreq.QuadPart); + timeInitialized = true; + } + LARGE_INTEGER iNow; + iNow.QuadPart = 0; + QueryPerformanceCounter(&iNow); + iNow.QuadPart -= start.QuadPart; + if (iNow.QuadPart < 0) + iNow.QuadPart = 0; + double now = static_cast<double>(iNow.QuadPart); + now /= freq; // now is seconds after this + o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s "; +} +}} diff --git a/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h new file mode 100644 index 0000000000..51f613cc25 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h @@ -0,0 +1,39 @@ +#ifndef _sys_windows_mingw32_compat +#define _sys_windows_mingw32_compat +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef WIN32 +#ifndef _MSC_VER + +// +// The following definitions for extension function GUIDs and signatures are taken from +// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of +// mswsock.h, but are not included presently. +// + +#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} +typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED); + +#endif +#endif + +#endif diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.cpp b/qpid/cpp/src/qpid/sys/windows/uuid.cpp new file mode 100644 index 0000000000..3316ecbc00 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/uuid.cpp @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <rpc.h> +#ifdef uuid_t /* Done in rpcdce.h */ +# undef uuid_t +#endif + +#include "qpid/sys/windows/uuid.h" + +#include <string.h> + +void uuid_clear (uuid_t uu) { + UuidCreateNil (reinterpret_cast<UUID*>(uu)); +} + +void uuid_copy (uuid_t dst, const uuid_t src) { + memcpy (dst, src, qpid::sys::UuidSize); +} + +void uuid_generate (uuid_t out) { + UuidCreate (reinterpret_cast<UUID*>(out)); +} + +int uuid_is_null (const uuid_t uu) { + RPC_STATUS unused; + return UuidIsNil ((UUID*)uu, &unused); +} + +int uuid_parse (const char *in, uuid_t uu) { + return UuidFromString ((unsigned char*)in, (UUID*)uu) == RPC_S_OK ? 0 : -1; +} + +void uuid_unparse (const uuid_t uu, char *out) { + unsigned char *formatted; + if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) { +#ifdef _MSC_VER + strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE); +#else + strncpy (out, (char*)formatted, 36+1); +#endif + RpcStringFree(&formatted); + } +} + +int uuid_compare (const uuid_t a, const uuid_t b) { + RPC_STATUS unused; + return !UuidEqual((UUID*)a, (UUID*)b, &unused); +} diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.h b/qpid/cpp/src/qpid/sys/windows/uuid.h new file mode 100644 index 0000000000..8ab132e9ce --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/uuid.h @@ -0,0 +1,39 @@ +#ifndef _sys_windows_uuid_h +#define _sys_windows_uuid_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/types/ImportExport.h" +#include <qpid/sys/IntegerTypes.h> + +namespace qpid { namespace sys { const size_t UuidSize = 16; }} +typedef uint8_t uuid_t[qpid::sys::UuidSize]; + +QPID_TYPES_EXTERN void uuid_clear (uuid_t uu); +QPID_TYPES_EXTERN void uuid_copy (uuid_t dst, const uuid_t src); +QPID_TYPES_EXTERN void uuid_generate (uuid_t out); +QPID_TYPES_EXTERN int uuid_is_null (const uuid_t uu); // Returns 1 if null, else 0 +QPID_TYPES_EXTERN int uuid_parse (const char *in, uuid_t uu); // Returns 0 on success, else -1 +QPID_TYPES_EXTERN void uuid_unparse (const uuid_t uu, char *out); +QPID_TYPES_EXTERN int uuid_compare (const uuid_t a, const uuid_t b); + +#endif /*!_sys_windows_uuid_h*/ |