diff options
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows')
33 files changed, 5501 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp new file mode 100644 index 0000000000..d65aad1304 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -0,0 +1,713 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/windows/WinSocket.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/mingw32_compat.h" + +#include <boost/thread/once.hpp> + +#include <queue> +#include <winsock2.h> +#include <mswsock.h> +#include <windows.h> + +#include <boost/bind.hpp> +#include <boost/shared_array.hpp> +#include "qpid/sys/windows/AsynchIO.h" + +namespace { + + typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock; + +/* + * The function pointers for AcceptEx and ConnectEx need to be looked up + * at run time. + */ +const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) { + SOCKET h = io.fd; + GUID guidAcceptEx = WSAID_ACCEPTEX; + DWORD dwBytes = 0; + LPFN_ACCEPTEX fnAcceptEx; + WSAIoctl(h, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &guidAcceptEx, + sizeof(guidAcceptEx), + &fnAcceptEx, + sizeof(fnAcceptEx), + &dwBytes, + NULL, + NULL); + if (fnAcceptEx == 0) + throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx")); + return fnAcceptEx; +} + +} + +namespace qpid { +namespace sys { +namespace windows { + +/* + * Asynch Acceptor + * + */ +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) + : acceptedCallback(callback), + socket(s), + wSocket(IOHandle(s).fd), + fnAcceptEx(lookUpAcceptEx(s)) { + + s.setNonblocking(); +} + +AsynchAcceptor::~AsynchAcceptor() +{ + socket.close(); +} + +void AsynchAcceptor::start(Poller::shared_ptr poller) { + PollerHandle ph = PollerHandle(socket); + poller->monitorHandle(ph, Poller::INPUT); + restart (); +} + +void AsynchAcceptor::restart(void) { + DWORD bytesReceived = 0; // Not used, needed for AcceptEx API + AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback, + this, + socket); + BOOL status; + status = fnAcceptEx(wSocket, + IOHandle(*result->newSocket).fd, + result->addressBuffer, + 0, + AsynchAcceptResult::SOCKADDRMAXLEN, + AsynchAcceptResult::SOCKADDRMAXLEN, + &bytesReceived, + result->overlapped()); + QPID_WINDOWS_CHECK_ASYNC_START(status); +} + + +Socket* createSameTypeSocket(const Socket& sock) { + SOCKET socket = IOHandle(sock).fd; + // Socket currently has no actual socket attached + if (socket == INVALID_SOCKET) + return new WinSocket; + + ::sockaddr_storage sa; + ::socklen_t salen = sizeof(sa); + QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen)); + SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + return new WinSocket(s); +} + +AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, + const Socket& lsocket) + : callback(cb), acceptor(acceptor), + listener(IOHandle(lsocket).fd), + newSocket(createSameTypeSocket(lsocket)) { +} + +void AsynchAcceptResult::success(size_t /*bytesTransferred*/) { + ::setsockopt (IOHandle(*newSocket).fd, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&listener, + sizeof (listener)); + callback(*(newSocket.release())); + acceptor->restart (); + delete this; +} + +void AsynchAcceptResult::failure(int /*status*/) { + //if (status != WSA_OPERATION_ABORTED) + // Can there be anything else? ; + delete this; +} + +/* + * AsynchConnector does synchronous connects for now... to do asynch the + * IocpPoller will need some extension to register an event handle as a + * CONNECT-type "direction", the connect completion/result will need an + * event handle to associate with the connecting handle. But there's no + * time for that right now... + */ +AsynchConnector::AsynchConnector(const Socket& sock, + const std::string& hname, + const std::string& p, + ConnectedCallback connCb, + FailedCallback failCb) : + connCallback(connCb), failCallback(failCb), socket(sock), + hostname(hname), port(p) +{ +} + +void AsynchConnector::start(Poller::shared_ptr) +{ + try { + socket.connect(SocketAddress(hostname, port)); + socket.setNonblocking(); + connCallback(socket); + } catch(std::exception& e) { + if (failCallback) + failCallback(socket, -1, std::string(e.what())); + socket.close(); + } +} + +// This can never be called in the current windows code as connect +// is blocking and requestCallback only makes sense if connect is +// non-blocking with the results returned via a poller callback. +void AsynchConnector::requestCallback(RequestCallback rCb) +{ +} + +} // namespace windows + +AsynchAcceptor* AsynchAcceptor::create(const Socket& s, + Callback callback) +{ + return new windows::AsynchAcceptor(s, callback); +} + +AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb) +{ + return new windows::AsynchConnector(s, + hostname, + port, + connCb, + failCb); +} + + +/* + * Asynch reader/writer + */ + +namespace windows { + +// This is used to encapsulate pure callbacks into a handle +class CallbackHandle : public IOHandle { +public: + CallbackHandle(AsynchIoResult::Completer completeCb, + AsynchIO::RequestCallback reqCb = 0) : + IOHandle(INVALID_SOCKET, completeCb, reqCb) + {} +}; + +AsynchIO::AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb) : + + readCallback(rCb), + eofCallback(eofCb), + disCallback(disCb), + closedCallback(cCb), + emptyCallback(eCb), + idleCallback(iCb), + socket(s), + bufferCount(BufferCount), + opsInProgress(0), + writeInProgress(false), + readInProgress(false), + queuedDelete(false), + queuedClose(false), + working(false) { +} + +AsynchIO::~AsynchIO() { +} + +void AsynchIO::queueForDeletion() { + { + ScopedLock<Mutex> l(completionLock); + assert(!queuedDelete); + queuedDelete = true; + if (working || opsInProgress > 0) { + QPID_LOG(info, "Delete AsynchIO queued; ops in progress"); + // AsynchIOHandler calls this then deletes itself; don't do any more + // callbacks. + readCallback = 0; + eofCallback = 0; + disCallback = 0; + closedCallback = 0; + emptyCallback = 0; + idleCallback = 0; + return; + } + } + delete this; +} + +void AsynchIO::start(Poller::shared_ptr poller0) { + PollerHandle ph = PollerHandle(socket); + poller = poller0; + poller->monitorHandle(ph, Poller::INPUT); + if (writeQueue.size() > 0) // Already have data queued for write + notifyPendingWrite(); + startReading(); +} + +uint32_t AsynchIO::getBufferCount(void) { return bufferCount; } + +void AsynchIO::setBufferCount(uint32_t count) { bufferCount = count; } + + +void AsynchIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*bufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(bufferCount); + for (uint32_t i = 0; i < bufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + +void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + QLock l(bufferQueueLock); + bufferQueue.push_back(buff); +} + +void AsynchIO::unread(AsynchIO::BufferBase* buff) { + assert(buff); + buff->squish(); + QLock l(bufferQueueLock); + bufferQueue.push_front(buff); +} + +void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) { + assert(buff); + QLock l(bufferQueueLock); + writeQueue.push_back(buff); + if (!writeInProgress) + notifyPendingWrite(); +} + +void AsynchIO::notifyPendingWrite() { + // This method is generally called from a processing thread; transfer + // work on this to an I/O thread. Much of the upper layer code assumes + // that all I/O-related things happen in an I/O thread. + if (poller == 0) // Not really going yet... + return; + + InterlockedIncrement(&opsInProgress); + PollerHandle ph(CallbackHandle(boost::bind(&AsynchIO::completion, this, _1))); + poller->monitorHandle(ph, Poller::OUTPUT); +} + +void AsynchIO::queueWriteClose() { + { + ScopedLock<Mutex> l(completionLock); + queuedClose = true; + if (working || writeInProgress) + // no need to summon an IO thread + return; + } + notifyPendingWrite(); +} + +bool AsynchIO::writeQueueEmpty() { + QLock l(bufferQueueLock); + return writeQueue.size() == 0; +} + +/* + * Initiate a read operation. AsynchIO::readComplete() will be + * called when the read is complete and data is available. + */ +void AsynchIO::startReading() { + if (queuedDelete || queuedClose) + return; + + // (Try to) get a buffer; look on the front since there may be an + // "unread" one there with data remaining from last time. + AsynchIO::BufferBase *buff = 0; + { + QLock l(bufferQueueLock); + + if (!bufferQueue.empty()) { + buff = bufferQueue.front(); + assert(buff); + bufferQueue.pop_front(); + } + else { + logNoBuffers("startReading"); + } + } + if (buff != 0) { + int readCount = buff->byteCount - buff->dataCount; + AsynchReadResult *result = + new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1), + buff, + readCount); + DWORD bytesReceived = 0, flags = 0; + InterlockedIncrement(&opsInProgress); + readInProgress = true; + int status = WSARecv(IOHandle(socket).fd, + const_cast<LPWSABUF>(result->getWSABUF()), 1, + &bytesReceived, + &flags, + result->overlapped(), + 0); + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + result->failure(error); + result = 0; // result is invalid here + return; + } + } + // On status 0 or WSA_IO_PENDING, completion will handle the rest. + } + else { + notifyBuffersEmpty(); + } + return; +} + +// Queue the specified callback for invocation from an I/O thread. +void AsynchIO::requestCallback(RequestCallback callback) { + // This method is generally called from a processing thread; transfer + // work on this to an I/O thread. Much of the upper layer code assumes + // that all I/O-related things happen in an I/O thread. + if (poller == 0) // Not really going yet... + return; + + InterlockedIncrement(&opsInProgress); + PollerHandle ph(CallbackHandle( + boost::bind(&AsynchIO::completion, this, _1), + callback)); + poller->monitorHandle(ph, Poller::INPUT); +} + +/** + * Return a queued buffer if there are enough to spare. + */ +AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { + QLock l(bufferQueueLock); + BufferBase* buff = bufferQueue.empty() ? 0 : bufferQueue.back(); + // An "unread" buffer is reserved for future read operations (which + // take from the front of the queue). + if (!buff || (buff->dataCount && bufferQueue.size() == 1)) { + if (buff) + logNoBuffers("getQueuedBuffer with unread data"); + else + logNoBuffers("getQueuedBuffer with empty queue"); + return 0; + } + assert(buff->dataCount == 0); + bufferQueue.pop_back(); + return buff; +} + +void AsynchIO::notifyEof(void) { + if (eofCallback) + eofCallback(*this); +} + +void AsynchIO::notifyDisconnect(void) { + if (disCallback) { + DisconnectCallback dcb = disCallback; + closedCallback = 0; + disCallback = 0; + dcb(*this); + // May have just been deleted. + return; + } +} + +void AsynchIO::notifyClosed(void) { + if (closedCallback) { + ClosedCallback ccb = closedCallback; + closedCallback = 0; + disCallback = 0; + ccb(*this, socket); + // May have just been deleted. + return; + } +} + +void AsynchIO::notifyBuffersEmpty(void) { + if (emptyCallback) + emptyCallback(*this); +} + +void AsynchIO::notifyIdle(void) { + if (idleCallback) + idleCallback(*this); +} + +/* + * Asynch reader/writer using overlapped I/O + */ + +void AsynchIO::startWrite(AsynchIO::BufferBase* buff) { + writeInProgress = true; + InterlockedIncrement(&opsInProgress); + AsynchWriteResult *result = + new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1), + buff, + buff->dataCount); + DWORD bytesSent = 0; + int status = WSASend(IOHandle(socket).fd, + const_cast<LPWSABUF>(result->getWSABUF()), 1, + &bytesSent, + 0, + result->overlapped(), + 0); + if (status != 0) { + int error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + result->failure(error); // Also decrements in-progress count + result = 0; // result is invalid here + return; + } + } + // On status 0 or WSA_IO_PENDING, completion will handle the rest. + return; +} + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(void) { + socket.close(); + notifyClosed(); +} + +SecuritySettings AsynchIO::getSecuritySettings() { + SecuritySettings settings; + settings.ssf = socket.getKeyLen(); + settings.authid = socket.getClientAuthId(); + return settings; +} + +void AsynchIO::readComplete(AsynchReadResult *result) { + int status = result->getStatus(); + size_t bytes = result->getTransferred(); + readInProgress = false; + if (status == 0 && bytes > 0) { + if (readCallback) + readCallback(*this, result->getBuff()); + startReading(); + } + else { + // No data read, so put the buffer back. It may be partially filled, + // so "unread" it back to the front of the queue. + unread(result->getBuff()); + if (queuedClose) { + return; // Expected from cancelRead() + } + notifyEof(); + if (status != 0) + { + notifyDisconnect(); + } + } +} + +/* + * NOTE - this completion is called for completed writes and also when + * a write is desired. The difference is in the buff - if a write is desired + * the buff is 0. + */ +void AsynchIO::writeComplete(AsynchWriteResult *result) { + int status = result->getStatus(); + size_t bytes = result->getTransferred(); + AsynchIO::BufferBase *buff = result->getBuff(); + if (buff != 0) { + writeInProgress = false; + if (status == 0 && bytes > 0) { + if (bytes < result->getRequested()) // Still more to go; resubmit + startWrite(buff); + else + queueReadBuffer(buff); // All done; back to the pool + } + else { + // An error... if it's a connection close, ignore it - it will be + // noticed and handled on a read completion any moment now. + // What to do with real error??? Save the Buffer? TBD. + queueReadBuffer(buff); // All done; back to the pool + } + } + + // If there are no writes outstanding, check for more writes to initiate + // (either queued or via idle). The opsInProgress count is handled in + // completion() + if (!writeInProgress) { + bool writing = false; + { + QLock l(bufferQueueLock); + if (writeQueue.size() > 0) { + buff = writeQueue.front(); + assert(buff); + writeQueue.pop_front(); + startWrite(buff); + writing = true; + } + } + if (!writing && !queuedClose) { + notifyIdle(); + } + } + return; +} + +void AsynchIO::completion(AsynchIoResult *result) { + bool closing = false; + bool deleting = false; + { + ScopedLock<Mutex> l(completionLock); + if (working) { + completionQueue.push(result); + return; + } + + // First thread in with something to do; note we're working then keep + // handling completions. + working = true; + while (result != 0) { + // New scope to unlock temporarily. + { + ScopedUnlock<Mutex> ul(completionLock); + AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); + if (r != 0) + readComplete(r); + else { + AsynchWriteResult *w = + dynamic_cast<AsynchWriteResult*>(result); + if (w != 0) + writeComplete(w); + else { + AsynchCallbackRequest *req = + dynamic_cast<AsynchCallbackRequest*>(result); + req->reqCallback(*this); + } + } + delete result; + result = 0; + InterlockedDecrement(&opsInProgress); + if (queuedClose && opsInProgress == 1 && readInProgress) + cancelRead(); + } + // Lock is held again. + if (completionQueue.empty()) + continue; + result = completionQueue.front(); + completionQueue.pop(); + } + working = false; + if (opsInProgress == 0) { + closing = queuedClose; + deleting = queuedDelete; + } + } + // Lock released; ok to close if ops are done and close requested. + // Layer above will call back to queueForDeletion() if it hasn't + // already been done. If it already has, go ahead and delete. + if (deleting) + delete this; + else if (closing) + // close() may cause a delete; don't trust 'this' on return + close(); +} + +/* + * NOTE - this method must be called in the same context as other completions, + * so that the resulting readComplete, and final AsynchIO::close() is serialized + * after this method returns. + */ +void AsynchIO::cancelRead() { + if (queuedDelete) + return; // socket already deleted + else { + ScopedLock<Mutex> l(completionLock);; + if (!completionQueue.empty()) + return; // process it; come back later if necessary + } + // Cancel outstanding read and force to completion. Otherwise, on a faulty + // physical link, the pending read can remain uncompleted indefinitely. + // Draining the pending read will result in the official close (and + // notifyClosed). CancelIoEX() is the natural choice, but not available in + // XP, so we make do with closesocket(). + socket.close(); +} + +/* + * Track down cause of unavailable buffer if it recurs: QPID-5033 + */ +void AsynchIO::logNoBuffers(const char *context) { + QPID_LOG(error, "No IO buffers available: " << context << + ". Debug data: " << bufferQueue.size() << + ' ' << writeQueue.size() << + ' ' << completionQueue.size() << + ' ' << opsInProgress << + ' ' << writeInProgress << + ' ' << readInProgress << + ' ' << working); +} + + +} // namespace windows + +AsynchIO* qpid::sys::AsynchIO::create(const Socket& s, + AsynchIO::ReadCallback rCb, + AsynchIO::EofCallback eofCb, + AsynchIO::DisconnectCallback disCb, + AsynchIO::ClosedCallback cCb, + AsynchIO::BuffersEmptyCallback eCb, + AsynchIO::IdleCallback iCb) +{ + return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.h b/qpid/cpp/src/qpid/sys/windows/AsynchIO.h new file mode 100644 index 0000000000..a50864b561 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.h @@ -0,0 +1,235 @@ +#ifndef _sys_windows_AsynchIO +#define _sys_windows_AsynchIO + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIoResult.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Poller.h" +#include "qpid/CommonImportExport.h" +#include "qpid/sys/Mutex.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/shared_array.hpp> +#include <winsock2.h> +#include <mswsock.h> +#include <windows.h> + +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + +namespace qpid { +namespace sys { +namespace windows { + +/* + * Asynch Acceptor + */ + +class AsynchAcceptor : public qpid::sys::AsynchAcceptor { + + friend class AsynchAcceptResult; + +public: + AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); + ~AsynchAcceptor(); + void start(Poller::shared_ptr poller); + +private: + void restart(void); + + AsynchAcceptor::Callback acceptedCallback; + const Socket& socket; + const SOCKET wSocket; + const LPFN_ACCEPTEX fnAcceptEx; +}; + + +class AsynchConnector : public qpid::sys::AsynchConnector { +private: + ConnectedCallback connCallback; + FailedCallback failCallback; + const Socket& socket; + const std::string hostname; + const std::string port; + +public: + AsynchConnector(const Socket& socket, + const std::string& hostname, + const std::string& port, + ConnectedCallback connCb, + FailedCallback failCb = 0); + void start(Poller::shared_ptr poller); + void requestCallback(RequestCallback rCb); +}; + +class AsynchIO : public qpid::sys::AsynchIO { + + friend class SslAsynchIO; + +public: + AsynchIO(const Socket& s, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0); + ~AsynchIO(); + + // Methods inherited from qpid::sys::AsynchIO + + /** + * Notify the object is should delete itself as soon as possible. + */ + virtual void queueForDeletion(); + + /// Take any actions needed to prepare for working with the poller. + virtual void start(Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void requestCallback(RequestCallback); + + /** + * getQueuedBuffer returns a buffer from the buffer queue, if one is + * available. + * + * @retval Pointer to BufferBase buffer; 0 if none is available. + */ + virtual BufferBase* getQueuedBuffer(); + + virtual SecuritySettings getSecuritySettings(void); + +private: + ReadCallback readCallback; + EofCallback eofCallback; + DisconnectCallback disCallback; + ClosedCallback closedCallback; + BuffersEmptyCallback emptyCallback; + IdleCallback idleCallback; + const Socket& socket; + Poller::shared_ptr poller; + uint32_t bufferCount; + + std::deque<BufferBase*> bufferQueue; + std::deque<BufferBase*> writeQueue; + /* The MSVC-supplied deque is not thread-safe; keep locks to serialize + * access to the buffer queue and write queue. + */ + Mutex bufferQueueLock; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; + + // Number of outstanding I/O operations. + volatile LONG opsInProgress; + // Is there a write in progress? + volatile bool writeInProgress; + // Or a read? + volatile bool readInProgress; + // Deletion requested, but there are callbacks in progress. + volatile bool queuedDelete; + // Socket close requested, but there are operations in progress. + volatile bool queuedClose; + +protected: + uint32_t getBufferCount(void); + void setBufferCount(uint32_t); + +private: + // Dispatch events that have completed. + void notifyEof(void); + void notifyDisconnect(void); + void notifyClosed(void); + void notifyBuffersEmpty(void); + void notifyIdle(void); + + /** + * Initiate a write of the specified buffer. There's no callback for + * write completion to the AsynchIO object. + */ + void startWrite(AsynchIO::BufferBase* buff); + + void close(void); + + /** + * startReading initiates reading, readComplete() is + * called when the read completes. + */ + void startReading(); + + /** + * readComplete is called when a read request is complete. + * + * @param result Results of the operation. + */ + void readComplete(AsynchReadResult *result); + + /** + * writeComplete is called when a write request is complete. + * + * @param result Results of the operation. + */ + void writeComplete(AsynchWriteResult *result); + + /** + * Queue of completions to run. This queue enforces the requirement + * from upper layers that only one thread at a time is allowed to act + * on any given connection. Once a thread is busy processing a completion + * on this object, other threads that dispatch completions queue the + * completions here for the in-progress thread to handle when done. + * Thus, any threads can dispatch a completion from the IocpPoller, but + * this class ensures that actual processing at the connection level is + * only on one thread at a time. + */ + std::queue<AsynchIoResult *> completionQueue; + volatile bool working; + Mutex completionLock; + + /** + * Called when there's a completion to process. + */ + void completion(AsynchIoResult *result); + + /** + * Helper function to facilitate the close operation + */ + void cancelRead(); + + /** + * Log information about buffer depletion, which should never happen. + * See QPID-5033. + */ + void logNoBuffers(const char*); +}; + +}}} // namespace qpid::sys::windows + +#endif // _sys_windows_AsynchIO diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h new file mode 100755 index 0000000000..27e4c22138 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h @@ -0,0 +1,204 @@ +#ifndef _windows_asynchIoResult_h +#define _windows_asynchIoResult_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Socket.h" +#include <memory.h> +#include <winsock2.h> +#include <ws2tcpip.h> + +namespace qpid { +namespace sys { +namespace windows { + +/* + * AsynchIoResult defines the class that receives the result of an + * asynchronous I/O operation, either send/recv or accept/connect. + * + * Operation factories should set one of these up before beginning the + * operation. Poller knows how to dispatch completion to this class. + * This class must be subclassed for needed operations; this class provides + * an interface only and cannot be instantiated. + * + * This class is tied to Windows; it inherits from OVERLAPPED so that the + * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call + * the completion handler. + */ +class AsynchResult : private OVERLAPPED { +public: + LPOVERLAPPED overlapped(void) { return this; } + static AsynchResult* from_overlapped(LPOVERLAPPED ol) { + return static_cast<AsynchResult*>(ol); + } + virtual void success (size_t bytesTransferred) { + bytes = bytesTransferred; + status = 0; + complete(); + } + virtual void failure (int error) { + bytes = 0; + status = error; + complete(); + } + size_t getTransferred(void) const { return bytes; } + int getStatus(void) const { return status; } + +protected: + AsynchResult() : bytes(0), status(0) + { memset(overlapped(), 0, sizeof(OVERLAPPED)); } + ~AsynchResult() {} + virtual void complete(void) = 0; + + size_t bytes; + int status; +}; + +class AsynchAcceptor; + +class AsynchAcceptResult : public AsynchResult { + + friend class AsynchAcceptor; + +public: + AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb, + AsynchAcceptor *acceptor, + const qpid::sys::Socket& listener); + virtual void success (size_t bytesTransferred); + virtual void failure (int error); + +private: + virtual void complete(void) {} // No-op for this class. + + qpid::sys::AsynchAcceptor::Callback callback; + AsynchAcceptor *acceptor; + SOCKET listener; + std::auto_ptr<qpid::sys::Socket> newSocket; + + // AcceptEx needs a place to write the local and remote addresses + // when accepting the connection. Place those here; get enough for + // IPv6 addresses, even if the socket is IPv4. + enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16, + SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN }; + char addressBuffer[SOCKADDRBUFLEN]; +}; + +class AsynchIoResult : public AsynchResult { +public: + typedef boost::function1<void, AsynchIoResult *> Completer; + + virtual ~AsynchIoResult() {} + qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; } + size_t getRequested(void) const { return requested; } + const WSABUF *getWSABUF(void) const { return &wsabuf; } + +protected: + void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; } + +protected: + AsynchIoResult(Completer cb, + qpid::sys::AsynchIO::BufferBase *buff, size_t length) + : completionCallback(cb), iobuff(buff), requested(length) {} + + virtual void complete(void) = 0; + WSABUF wsabuf; + Completer completionCallback; + +private: + qpid::sys::AsynchIO::BufferBase *iobuff; + size_t requested; // Number of bytes in original I/O request +}; + +class AsynchReadResult : public AsynchIoResult { + + // complete() updates buffer then does completion callback. + virtual void complete(void) { + getBuff()->dataCount += bytes; + completionCallback(this); + } + +public: + AsynchReadResult(AsynchIoResult::Completer cb, + qpid::sys::AsynchIO::BufferBase *buff, + size_t length) + : AsynchIoResult(cb, buff, length) { + wsabuf.buf = buff->bytes + buff->dataCount; + wsabuf.len = length; + } +}; + +class AsynchWriteResult : public AsynchIoResult { + + // complete() updates buffer then does completion callback. + virtual void complete(void) { + qpid::sys::AsynchIO::BufferBase *b = getBuff(); + b->dataStart += bytes; + b->dataCount -= bytes; + completionCallback(this); + } + +public: + AsynchWriteResult(AsynchIoResult::Completer cb, + qpid::sys::AsynchIO::BufferBase *buff, + size_t length) + : AsynchIoResult(cb, buff, length) { + wsabuf.buf = buff ? buff->bytes : 0; + wsabuf.len = length; + } +}; + +class AsynchWriteWanted : public AsynchWriteResult { + + // complete() just does completion callback; no buffers used. + virtual void complete(void) { + completionCallback(this); + } + +public: + AsynchWriteWanted(AsynchIoResult::Completer cb) + : AsynchWriteResult(cb, 0, 0) { + wsabuf.buf = 0; + wsabuf.len = 0; + } +}; + +class AsynchCallbackRequest : public AsynchIoResult { + // complete() needs to simply call the completionCallback; no buffers. + virtual void complete(void) { + completionCallback(this); + } + +public: + AsynchCallbackRequest(AsynchIoResult::Completer cb, + qpid::sys::AsynchIO::RequestCallback reqCb) + : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) { + wsabuf.buf = 0; + wsabuf.len = 0; + } + + qpid::sys::AsynchIO::RequestCallback reqCallback; +}; + +}}} // qpid::sys::windows + +#endif /*!_windows_asynchIoResult_h*/ diff --git a/qpid/cpp/src/qpid/sys/windows/Condition.h b/qpid/cpp/src/qpid/sys/windows/Condition.h new file mode 100755 index 0000000000..cd5aebbf09 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Condition.h @@ -0,0 +1,77 @@ +#ifndef _sys_windows_Condition_h +#define _sys_windows_Condition_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" + +#include <time.h> +#include <boost/noncopyable.hpp> +#include <boost/thread/condition.hpp> +#include <boost/thread/thread_time.hpp> +#include <windows.h> + +namespace qpid { +namespace sys { + +/** + * A condition variable for thread synchronization. + */ +class Condition : private boost::noncopyable +{ + public: + inline Condition(); + inline ~Condition(); + inline void wait(Mutex&); + inline bool wait(Mutex&, const AbsTime& absoluteTime); + inline void notify(); + inline void notifyAll(); + + private: + boost::condition_variable_any condition; +}; + +Condition::Condition() { +} + +Condition::~Condition() { +} + +void Condition::wait(Mutex& mutex) { + condition.wait(mutex.mutex); +} + +bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ + return condition.timed_wait(mutex.mutex, absoluteTime.timepoint); +} + +void Condition::notify(){ + condition.notify_one(); +} + +void Condition::notifyAll(){ + condition.notify_all(); +} + +}} +#endif /*!_sys_windows_Condition_h*/ diff --git a/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp new file mode 100644 index 0000000000..5128f0f8d6 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp @@ -0,0 +1,90 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/FileSysDir.h" +#include "qpid/sys/StrError.h" +#include "qpid/Exception.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <direct.h> +#include <errno.h> +#include <windows.h> +#include <strsafe.h> + + +namespace qpid { +namespace sys { + +bool FileSysDir::exists (void) const +{ + const char *cpath = dirPath.c_str (); + struct _stat s; + if (::_stat(cpath, &s)) { + if (errno == ENOENT) { + return false; + } + throw qpid::Exception (strError(errno) + + ": Can't check directory: " + dirPath); + } + if (s.st_mode & _S_IFDIR) + return true; + throw qpid::Exception(dirPath + " is not a directory"); +} + +void FileSysDir::mkdir(void) +{ + if (::_mkdir(dirPath.c_str()) == -1) + throw Exception ("Can't create directory: " + dirPath); +} + +void FileSysDir::forEachFile(Callback cb) const { + + WIN32_FIND_DATAA findFileData; + char szDir[MAX_PATH]; + size_t dirPathLength; + HANDLE hFind = INVALID_HANDLE_VALUE; + + // create dirPath+"\*" in szDir + StringCchLength (dirPath.c_str(), MAX_PATH, &dirPathLength); + + if (dirPathLength > (MAX_PATH - 3)) { + throw Exception ("Directory path is too long: " + dirPath); + } + + StringCchCopy(szDir, MAX_PATH, dirPath.c_str()); + StringCchCat(szDir, MAX_PATH, TEXT("\\*")); + + // Special work for first file + hFind = FindFirstFileA(szDir, &findFileData); + if (INVALID_HANDLE_VALUE == hFind) { + return; + } + + // process everything that isn't a directory + do { + if (!(findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) { + std::string fileName(dirPath); + fileName += "\\"; + fileName += findFileData.cFileName; + cb(fileName); + } + } while (FindNextFile(hFind, &findFileData) != 0); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp new file mode 100755 index 0000000000..19a1c44875 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/windows/IoHandlePrivate.h" +#include <windows.h> + +namespace qpid { +namespace sys { + + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h new file mode 100755 index 0000000000..4529ad93ec --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h @@ -0,0 +1,58 @@ +#ifndef _sys_windows_IoHandlePrivate_h +#define _sys_windows_IoHandlePrivate_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/CommonImportExport.h" + +#include <winsock2.h> + +namespace qpid { +namespace sys { + +// Private fd related implementation details +// There should be either a valid socket handle or a completer callback. +// Handle is used to associate with poller's iocp; completer is used to +// inject a completion that will very quickly trigger a callback to the +// completer from an I/O thread. If the callback mechanism is used, there +// can be a RequestCallback set - this carries the callback object through +// from AsynchIO::requestCallback() through to the I/O completion processing. +class IOHandle { +public: + IOHandle(SOCKET f = INVALID_SOCKET, + windows::AsynchIoResult::Completer cb = 0, + AsynchIO::RequestCallback reqCallback = 0) : + fd(f), + event(cb), + cbRequest(reqCallback) + {} + + SOCKET fd; + windows::AsynchIoResult::Completer event; + AsynchIO::RequestCallback cbRequest; +}; + +}} + +#endif /* _sys_windows_IoHandlePrivate_h */ diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp new file mode 100755 index 0000000000..ecb33c5517 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp @@ -0,0 +1,220 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Poller.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/windows/check.h" + +#include <winsock2.h> +#include <windows.h> + +#include <assert.h> +#include <vector> +#include <exception> + +namespace qpid { +namespace sys { + +class PollerHandlePrivate { + friend class Poller; + friend class PollerHandle; + + SOCKET fd; + windows::AsynchIoResult::Completer cb; + AsynchIO::RequestCallback cbRequest; + + PollerHandlePrivate(SOCKET f, + windows::AsynchIoResult::Completer cb0 = 0, + AsynchIO::RequestCallback rcb = 0) + : fd(f), cb(cb0), cbRequest(rcb) + { + } + +}; + +PollerHandle::PollerHandle(const IOHandle& h) : + impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest)) +{} + +PollerHandle::~PollerHandle() { + delete impl; +} + +/** + * Concrete implementation of Poller to use the Windows I/O Completion + * port (IOCP) facility. + */ +class PollerPrivate { + friend class Poller; + + const HANDLE iocp; + + // The number of threads running the event loop. + volatile LONG threadsRunning; + + // Shutdown request is handled by setting isShutdown and injecting a + // well-formed completion event into the iocp. + bool isShutdown; + + PollerPrivate() : + iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)), + threadsRunning(0), + isShutdown(false) { + QPID_WINDOWS_CHECK_NULL(iocp); + } + + ~PollerPrivate() { + // It's probably okay to ignore any errors here as there can't be + // data loss + ::CloseHandle(iocp); + } +}; + +void Poller::shutdown() { + // Allow sloppy code to shut us down more than once. + if (impl->isShutdown) + return; + impl->isShutdown = true; + ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O + PostQueuedCompletionStatus(impl->iocp, 0, key, 0); +} + +bool Poller::hasShutdown() +{ + return impl->isShutdown; +} + +bool Poller::interrupt(PollerHandle&) { + return false; // There's no concept of a registered handle. +} + +void Poller::run() { + while (!impl->isShutdown) { + Poller::Event event = this->wait(); + + // Handle shutdown + switch (event.type) { + case Poller::SHUTDOWN: + return; + break; + case Poller::INVALID: // On any type of success or fail completion + break; + default: + // This should be impossible + assert(false); + } + } +} + +void Poller::monitorHandle(PollerHandle& handle, Direction dir) { + HANDLE h = (HANDLE)(handle.impl->fd); + if (h != INVALID_HANDLE_VALUE) { + HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0); + QPID_WINDOWS_CHECK_NULL(iocpHandle); + } + else { + // INPUT is used to request a callback; OUTPUT to request a write + assert(dir == Poller::INPUT || dir == Poller::OUTPUT); + + if (dir == Poller::OUTPUT) { + windows::AsynchWriteWanted *result = + new windows::AsynchWriteWanted(handle.impl->cb); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } + else { + windows::AsynchCallbackRequest *result = + new windows::AsynchCallbackRequest(handle.impl->cb, + handle.impl->cbRequest); + PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped()); + } + } +} + +// All no-ops... +void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {} +void Poller::registerHandle(PollerHandle& /*handle*/) {} +void Poller::unregisterHandle(PollerHandle& /*handle*/) {} + +Poller::Event Poller::wait(Duration timeout) { + DWORD timeoutMs = 0; + DWORD numTransferred = 0; + ULONG_PTR completionKey = 0; + OVERLAPPED *overlapped = 0; + windows::AsynchResult *result = 0; + + // Wait for either an I/O operation to finish (thus signaling the + // IOCP handle) or a shutdown request to be made (thus signaling the + // shutdown event). + if (timeout == TIME_INFINITE) + timeoutMs = INFINITE; + else + timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC); + + InterlockedIncrement(&impl->threadsRunning); + bool goodOp = ::GetQueuedCompletionStatus (impl->iocp, + &numTransferred, + &completionKey, + &overlapped, + timeoutMs); + LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning); + if (goodOp) { + // Dequeued a successful completion. If it's a posted packet from + // shutdown() the overlapped ptr is 0 and key is 1. Else downcast + // the OVERLAPPED pointer to an AsynchIoResult and call the + // completion handler. + if (overlapped == 0 && completionKey == 1) { + // If there are other threads still running this wait, re-post + // the completion. + if (remainingThreads > 0) + PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0); + return Event(0, SHUTDOWN); + } + + result = windows::AsynchResult::from_overlapped(overlapped); + result->success (static_cast<size_t>(numTransferred)); + } + else { + if (overlapped != 0) { + // Dequeued a completion for a failed operation. Downcast back + // to the result object and inform it that the operation failed. + DWORD status = ::GetLastError(); + result = windows::AsynchResult::from_overlapped(overlapped); + result->failure (static_cast<int>(status)); + } + } + return Event(0, INVALID); // TODO - this may need to be changed. + +} + +// Concrete constructors +Poller::Poller() : + impl(new PollerPrivate()) +{} + +Poller::~Poller() { + delete impl; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/LockFile.cpp b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp new file mode 100755 index 0000000000..048c2d5b18 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp @@ -0,0 +1,64 @@ +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/LockFile.h" +#include "qpid/sys/windows/check.h" + +#include <windows.h> + +namespace qpid { +namespace sys { + +class LockFilePrivate { + friend class LockFile; + + HANDLE fd; + +public: + LockFilePrivate(HANDLE f) : fd(f) {} +}; + +LockFile::LockFile(const std::string& path_, bool create) + : path(path_), created(create) { + + HANDLE h = ::CreateFile(path.c_str(), + create ? (GENERIC_READ|GENERIC_WRITE) : GENERIC_READ, + FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE, + 0, /* Default security */ + create ? OPEN_ALWAYS : OPEN_EXISTING, + FILE_FLAG_DELETE_ON_CLOSE, /* Delete file when closed */ + NULL); + if (h == INVALID_HANDLE_VALUE) + throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError())); + + // Lock up to 4Gb + if (!::LockFile(h, 0, 0, 0xffffffff, 0)) + throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError())); + impl.reset(new LockFilePrivate(h)); +} + +LockFile::~LockFile() { + if (impl) { + if (impl->fd != INVALID_HANDLE_VALUE) { + ::UnlockFile(impl->fd, 0, 0, 0xffffffff, 0); + ::CloseHandle(impl->fd); + } + } +} + +}} /* namespace qpid::sys */ diff --git a/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp new file mode 100644 index 0000000000..60b3df7da6 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/MemoryMappedFile.h" + +namespace qpid { +namespace sys { +class MemoryMappedFilePrivate {}; + +MemoryMappedFile::MemoryMappedFile() : state(0) {} +MemoryMappedFile::~MemoryMappedFile() {} + +void MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/) +{ +} +void MemoryMappedFile::close() +{ +} +size_t MemoryMappedFile::getPageSize() +{ + return 0; +} +char* MemoryMappedFile::map(size_t /*offset*/, size_t /*size*/) +{ + return 0; +} +void MemoryMappedFile::unmap(char* /*region*/, size_t /*size*/) +{ +} +void MemoryMappedFile::flush(char* /*region*/, size_t /*size*/) +{ +} +void MemoryMappedFile::expand(size_t /*offset*/) +{ +} +bool MemoryMappedFile::isSupported() +{ + return false; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/Mutex.h b/qpid/cpp/src/qpid/sys/windows/Mutex.h new file mode 100755 index 0000000000..5dcc69e836 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Mutex.h @@ -0,0 +1,188 @@ +#ifndef _sys_windows_Mutex_h +#define _sys_windows_Mutex_h + +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/windows/check.h" + +#include <boost/version.hpp> +#if (BOOST_VERSION < 103500) +#error The Windows port requires Boost version 1.35.0 or later +#endif + +#include <boost/noncopyable.hpp> +#include <boost/thread/recursive_mutex.hpp> +#include <boost/thread/shared_mutex.hpp> +#include <boost/thread/thread_time.hpp> +#include <boost/thread/tss.hpp> + +namespace qpid { +namespace sys { + +class Condition; + +/** + * Mutex lock. + */ +class Mutex : private boost::noncopyable { + friend class Condition; + +public: + typedef ::qpid::sys::ScopedLock<Mutex> ScopedLock; + typedef ::qpid::sys::ScopedUnlock<Mutex> ScopedUnlock; + + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline bool trylock(); + + +protected: + boost::recursive_mutex mutex; +}; + +/** + * RW lock. + */ +class RWlock : private boost::noncopyable { + friend class Condition; + +public: + typedef ::qpid::sys::ScopedRlock<RWlock> ScopedRlock; + typedef ::qpid::sys::ScopedWlock<RWlock> ScopedWlock; + + inline RWlock(); + inline ~RWlock(); + inline void wlock(); // will write-lock + inline void rlock(); // will read-lock + inline void unlock(); + inline void trywlock(); // will write-try + inline void tryrlock(); // will read-try + +protected: + boost::shared_mutex rwMutex; + boost::thread_specific_ptr<bool> haveWrite; + + inline bool &write (void); +}; + + +/** + * PODMutex is a POD, can be static-initialized with + * PODMutex m = QPID_PODMUTEX_INITIALIZER + */ +struct PODMutex +{ + typedef ::qpid::sys::ScopedLock<PODMutex> ScopedLock; + + inline void lock(); + inline void unlock(); + inline bool trylock(); + + // Must be public to be a POD: + boost::recursive_mutex mutex; +}; + +#define QPID_MUTEX_INITIALIZER 0 + +void PODMutex::lock() { + mutex.lock(); +} + +void PODMutex::unlock() { + mutex.unlock(); +} + +bool PODMutex::trylock() { + return mutex.try_lock(); +} + +Mutex::Mutex() { +} + +Mutex::~Mutex(){ +} + +void Mutex::lock() { + mutex.lock(); +} + +void Mutex::unlock() { + mutex.unlock(); +} + +bool Mutex::trylock() { + return mutex.try_lock(); +} + + +RWlock::RWlock() { +} + +RWlock::~RWlock(){ +} + +void RWlock::wlock() { + bool &writer = write(); + rwMutex.lock(); + writer = true; // Remember this thread has write lock held. +} + +void RWlock::rlock() { + bool &writer = write(); + rwMutex.lock_shared(); + writer = false; // Remember this thread has shared lock held. +} + +void RWlock::unlock() { + bool &writer = write(); + if (writer) + rwMutex.unlock(); + else + rwMutex.unlock_shared(); +} + +void RWlock::trywlock() { + bool &writer = write(); + // shared_mutex::try_lock() seems to not be available... emulate it with + // a timed lock(). + boost::system_time now = boost::get_system_time(); + if (rwMutex.timed_lock(now)) + writer = true; +} + +void RWlock::tryrlock() { + bool &writer = write(); + if (rwMutex.try_lock_shared()) + writer = false; +} + +bool & RWlock::write (void) { + // Accessing thread-specific and stack-local info, so no locks needed. + bool *writePtr = haveWrite.get(); + if (writePtr == 0) { + writePtr = new bool(false); + haveWrite.reset(writePtr); + } + return *writePtr; +} + +}} +#endif /*!_sys_windows_Mutex_h*/ diff --git a/qpid/cpp/src/qpid/sys/windows/Path.cpp b/qpid/cpp/src/qpid/sys/windows/Path.cpp new file mode 100644 index 0000000000..1cb4521fde --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Path.cpp @@ -0,0 +1,65 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/Path.h" +#include "qpid/sys/StrError.h" +#include "qpid/Exception.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <direct.h> +#include <errno.h> +#include <windows.h> +#include <strsafe.h> + + +namespace qpid { +namespace sys { + +const std::string Path::separator("\\"); + +namespace { +// Return true for success, false for ENOENT, throw otherwise. +bool getStat(const std::string& path, struct _stat& s) { + if (::_stat(path.c_str(), &s)) { + if (errno == ENOENT) return false; + throw qpid::Exception("cannot stat: " + path + ": " + strError(errno)); + } + return true; +} + +bool isFlag(const std::string& path, unsigned long flag) { + struct _stat s; + return getStat(path, s) && (s.st_mode & flag); +} +} + +bool Path::exists () const { + struct _stat s; + return getStat(path, s); +} + +bool Path::isFile() const { return isFlag(path, _S_IFREG); } +bool Path::isDirectory() const { return isFlag(path, _S_IFDIR); } + +bool Path::isAbsolute() const { + return (path.size() > 0 && (path[0] == separator[0] || path[0] == '/')) + || (path.size() > 1 && (isalpha(path[0]) && path[1] == ':')); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp new file mode 100755 index 0000000000..062458ae5f --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp @@ -0,0 +1,101 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/sys/PipeHandle.h" +#include "qpid/sys/windows/check.h" +#include <winsock2.h> + +namespace qpid { +namespace sys { + +PipeHandle::PipeHandle(bool nonBlocking) { + + SOCKET listener, pair[2]; + struct sockaddr_in addr; + int err; + int addrlen = sizeof(addr); + pair[0] = pair[1] = INVALID_SOCKET; + if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + err = bind(listener, (const struct sockaddr*) &addr, sizeof(addr)); + if (err == SOCKET_ERROR) { + err = WSAGetLastError(); + closesocket(listener); + throw QPID_WINDOWS_ERROR(err); + } + + err = getsockname(listener, (struct sockaddr*) &addr, &addrlen); + if (err == SOCKET_ERROR) { + err = WSAGetLastError(); + closesocket(listener); + throw QPID_WINDOWS_ERROR(err); + } + + try { + if (listen(listener, 1) == SOCKET_ERROR) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if ((pair[0] = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if (connect(pair[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + if ((pair[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET) + throw QPID_WINDOWS_ERROR(WSAGetLastError()); + + closesocket(listener); + writeFd = pair[0]; + readFd = pair[1]; + } + catch (...) { + closesocket(listener); + if (pair[0] != INVALID_SOCKET) + closesocket(pair[0]); + throw; + } + + // Set the socket to non-blocking + if (nonBlocking) { + unsigned long nonblock = 1; + ioctlsocket(readFd, FIONBIO, &nonblock); + } +} + +PipeHandle::~PipeHandle() { + closesocket(readFd); + closesocket(writeFd); +} + +int PipeHandle::read(void* buf, size_t bufSize) { + return ::recv(readFd, (char *)buf, bufSize, 0); +} + +int PipeHandle::write(const void* buf, size_t bufSize) { + return ::send(writeFd, (const char *)buf, bufSize, 0); +} + +int PipeHandle::getReadHandle() { + return readFd; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp new file mode 100644 index 0000000000..3e2a5fb36c --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableCondition.h" +#include "qpid/sys/IOHandle.h" +#include "qpid/sys/windows/AsynchIoResult.h" +#include "qpid/sys/windows/IoHandlePrivate.h" + +#include <boost/bind.hpp> +#include <windows.h> + +namespace qpid { +namespace sys { + +// PollableConditionPrivate will reuse the IocpPoller's ability to queue +// a completion to the IOCP and have it dispatched to the completer callback +// noted in the IOHandlePrivate when the request is queued. The +// AsynchCallbackRequest object is not really used - we already have the +// desired callback for the user of PollableCondition. +class PollableConditionPrivate : private IOHandle { + friend class PollableCondition; + +private: + PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller); + ~PollableConditionPrivate(); + + void poke(); + void dispatch(windows::AsynchIoResult *result); + +private: + PollableCondition::Callback cb; + PollableCondition& parent; + boost::shared_ptr<sys::Poller> poller; + LONG isSet; + LONG isDispatching; +}; + +PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, + sys::PollableCondition& parent, + const boost::shared_ptr<sys::Poller>& poller) + : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)), + cb(cb), parent(parent), poller(poller), isSet(0), isDispatching(0) +{ +} + +PollableConditionPrivate::~PollableConditionPrivate() +{ +} + +void PollableConditionPrivate::poke() +{ + // monitorHandle will queue a completion for the IOCP; when it's handled, a + // poller thread will call back to dispatch() below. + PollerHandle ph(*this); + poller->monitorHandle(ph, Poller::INPUT); +} + +void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result) +{ + delete result; // Poller::monitorHandle() allocates this + // If isDispatching is already set, just return. Else, enter. + if (::InterlockedCompareExchange(&isDispatching, 1, 0) == 1) + return; + cb(parent); + LONG oops = ::InterlockedDecrement(&isDispatching); // Result must be 0 + assert(!oops); + if (isSet) + poke(); +} + + /* PollableCondition */ + +PollableCondition::PollableCondition(const Callback& cb, + const boost::shared_ptr<sys::Poller>& poller) + : impl(new PollableConditionPrivate(cb, *this, poller)) +{ +} + +PollableCondition::~PollableCondition() +{ + delete impl; +} + +void PollableCondition::set() { + // Add one to the set count and poke it to provoke a callback + ::InterlockedIncrement(&impl->isSet); + impl->poke(); +} + +void PollableCondition::clear() { + ::InterlockedExchange(&impl->isSet, 0); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/QpidDllMain.h b/qpid/cpp/src/qpid/sys/windows/QpidDllMain.h new file mode 100644 index 0000000000..74eaf0256a --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/QpidDllMain.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Include this file once in each DLL that relies on SystemInfo.h: + * threadSafeShutdown(). Note that Thread.cpp has a more elaborate + * DllMain, that also provides this functionality separately. + * + * Teardown is in the reverse order of the DLL dependencies used + * during the load phase. The calls to DllMain and the static + * destructors are from the same thread, so no locking is necessary + * and there is no downside to an invocation of DllMain by multiple + * Qpid DLLs. + */ + +#ifdef _DLL + +#include <qpid/ImportExport.h> +#include <windows.h> + +namespace qpid { +namespace sys { +namespace windows { + +QPID_IMPORT bool processExiting; +QPID_IMPORT bool libraryUnloading; + +}}} // namespace qpid::sys::SystemInfo + + +BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { + switch (reason) { + case DLL_PROCESS_ATTACH: + case DLL_THREAD_ATTACH: + case DLL_THREAD_DETACH: + break; + + case DLL_PROCESS_DETACH: + // Remember how the process is terminating this DLL. + if (reserved != NULL) { + qpid::sys::windows::processExiting = true; + // Danger: all threading suspect, including indirect use of malloc or locks. + // Think twice before adding more functionality here. + return TRUE; + } + else { + qpid::sys::windows::libraryUnloading = true; + } + break; + } + return TRUE; +} + + +#endif diff --git a/qpid/cpp/src/qpid/sys/windows/Shlib.cpp b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp new file mode 100644 index 0000000000..ba18747eb4 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Shlib.h" +#include "qpid/Exception.h" +#include "qpid/sys/windows/check.h" +#include <windows.h> + +namespace qpid { +namespace sys { + +void Shlib::load(const char* name) { + HMODULE h = LoadLibrary(name); + if (h == NULL) { + throw QPID_WINDOWS_ERROR(GetLastError()); + } + handle = static_cast<void*>(h); +} + +void Shlib::unload() { + if (handle) { + if (FreeLibrary(static_cast<HMODULE>(handle)) == 0) { + throw QPID_WINDOWS_ERROR(GetLastError()); + } + handle = 0; + } +} + +void* Shlib::getSymbol(const char* name) { + // Double cast avoids warning about casting function pointer to object + void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name))); + if (sym == NULL) + throw QPID_WINDOWS_ERROR(GetLastError()); + return sym; +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp new file mode 100644 index 0000000000..b0903752c6 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -0,0 +1,346 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include "qpid/log/Logger.h" + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include <winsock2.h> +#include <ws2tcpip.h> +#include <string.h> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0), + currentAddrInfo(0) +{ +} + +SocketAddress::SocketAddress(const SocketAddress& sa) : + host(sa.host), + port(sa.port), + addrInfo(0), + currentAddrInfo(0) +{ +} + +SocketAddress& SocketAddress::operator=(const SocketAddress& sa) +{ + SocketAddress temp(sa); + + std::swap(temp, *this); + return *this; +} + +SocketAddress::~SocketAddress() +{ + if (addrInfo) { + ::freeaddrinfo(addrInfo); + } +} + +std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen, bool dispNameOnly, bool hideDecoration) +{ + char servName[NI_MAXSERV]; + char dispName[NI_MAXHOST]; + if (int rc=::getnameinfo(addr, addrlen, + dispName, sizeof(dispName), + servName, sizeof(servName), + NI_NUMERICHOST | NI_NUMERICSERV) != 0) + throw qpid::Exception(QPID_MSG(gai_strerror(rc))); + std::string s; + switch (addr->sa_family) { + case AF_INET: s += dispName; break; + case AF_INET6: + if (!hideDecoration) { + s += "["; s += dispName; s+= "]"; + } else { + s += dispName; + } + break; + default: throw Exception(QPID_MSG("Unexpected socket type")); + } + if (!dispNameOnly) { + s += ":"; + s += servName; + } + return s; +} + +uint16_t SocketAddress::getPort(::sockaddr const * const addr) +{ + switch (addr->sa_family) { + case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port); + case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port); + default:throw Exception(QPID_MSG("Unexpected socket type")); + } +} + +std::string SocketAddress::asString(bool numeric, bool dispNameOnly, bool hideDecoration) const +{ + if (!numeric) + return host + ":" + port; + // Canonicalise into numeric id + const ::addrinfo& ai = getAddrInfo(*this); + + return asString(ai.ai_addr, ai.ai_addrlen, dispNameOnly, hideDecoration); +} + +std::string SocketAddress::getHost() const +{ + return host; +} + +/** + * Return true if this SocketAddress is IPv4 or IPv6 + */ +bool SocketAddress::isIp() const +{ + const ::addrinfo& ai = getAddrInfo(*this); + return ai.ai_family == AF_INET || ai.ai_family == AF_INET6; +} + +/** + * this represents the low address of an ACL address range. + * Given rangeHi that represents the high address, + * return a string showing the numeric comparisons that the + * inRange checks will do for address pair. + */ +std::string SocketAddress::comparisonDetails(const SocketAddress& rangeHi) const +{ + std::ostringstream os; + SocketAddress thisSa(*this); + SocketAddress rangeHiSa(rangeHi); + (void) getAddrInfo(thisSa); + (void) getAddrInfo(rangeHiSa); + os << "(" << thisSa.asString(true, true, false) << + "," << rangeHiSa.asString(true, true, false) << ")"; + while (thisSa.nextAddress()) { + if (!rangeHiSa.nextAddress()) { + throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() + + rangeHi.asString()))); + } + os << ",(" << thisSa.asString(true, true, false) << + "," << rangeHiSa.asString(true, true, false) << ")"; + } + if (rangeHiSa.nextAddress()) { + throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() + + rangeHi.asString()))); + } + std::string result = os.str(); + return result; +} + +/** + * For ACL address matching make sure that the two addresses, *this + * which is the low address and hiPeer which is the high address, are + * both numeric ip addresses of the same family and that hi > *this. + * + * Note that if the addresses resolve to more than one struct addrinfo + * then this and the hiPeer must be equal. This avoids having to do + * difficult range checks where the this and hiPeer both resolve to + * multiple IPv4 or IPv6 addresses. + * + * This check is run at acl file load time and not at run tme. + */ +bool SocketAddress::isComparable(const SocketAddress& hiPeer) const { + try { + // May only compare if this socket is IPv4 or IPv6 + SocketAddress lo(*this); + const ::addrinfo& peerLoInfo = getAddrInfo(lo); + if (!(peerLoInfo.ai_family == AF_INET || peerLoInfo.ai_family == AF_INET6)) { + return false; + } + try { + // May only compare if peer socket is same family + SocketAddress hi(hiPeer); + const ::addrinfo& peerHiInfo = getAddrInfo(hi); + if (peerLoInfo.ai_family != peerHiInfo.ai_family) { + return false; + } + // Host names that resolve to lists are allowed if they are equal. + // For example: localhost, or fjord.lab.example.com + if ((*this).asString() == hiPeer.asString()) { + return true; + } + // May only compare if this and peer resolve to single address. + if (lo.nextAddress() || hi.nextAddress()) { + return false; + } + // Make sure that the lo/hi relationship is ok + int res; + if (!compareAddresses(peerLoInfo, peerHiInfo, res) || res < 0) { + return false; + } + return true; + } catch (Exception) { + // failed to resolve hi + return false; + } + } catch (Exception) { + // failed to resolve lo + return false; + } +} + +/** + * *this SocketAddress was created from the numeric IP address of a + * connecting host. + * The lo and hi addresses are the limit checks from the ACL file. + * Return true if this address is in range of any of the address pairs + * in the limit check range. + * + * This check is executed on every incoming connection. + */ +bool SocketAddress::inRange(const SocketAddress& lo, + const SocketAddress& hi) const +{ + (*this).firstAddress(); + lo.firstAddress(); + hi.firstAddress(); + const ::addrinfo& thisInfo = getAddrInfo(*this); + const ::addrinfo& loInfo = getAddrInfo(lo); + const ::addrinfo& hiInfo = getAddrInfo(hi); + if (inRange(thisInfo, loInfo, hiInfo)) { + return true; + } + while (lo.nextAddress()) { + if (!hi.nextAddress()) { + assert (false); + throw(Exception(QPID_MSG("Comparison iteration fails: " + + lo.asString() + hi.asString()))); + } + const ::addrinfo& loInfo = getAddrInfo(lo); + const ::addrinfo& hiInfo = getAddrInfo(hi); + if (inRange(thisInfo, loInfo, hiInfo)) { + return true; + } + } + return false; +} + +/** + * *this SocketAddress was created from the numeric IP address of a + * connecting host. + * The lo and hi addresses are one binary address pair from a range + * given in an ACL file. + * Return true if this binary address is '>= lo' and '<= hi'. + */ +bool SocketAddress::inRange(const ::addrinfo& thisInfo, + const ::addrinfo& lo, + const ::addrinfo& hi) const +{ + int resLo; + int resHi; + if (!compareAddresses(lo, thisInfo, resLo)) { + return false; + } + if (!compareAddresses(hi, thisInfo, resHi)) { + return false; + } + if (resLo < 0) { + return false; + } + if (resHi > 0) { + return false; + } + return true; +} + +/** + * Compare this address against two binary low/high addresses. + * return true with result holding the comparison. + */ +bool SocketAddress::compareAddresses(const struct addrinfo& lo, + const struct addrinfo& hi, + int& result) const +{ + if (lo.ai_family != hi.ai_family) { + return false; + } + if (lo.ai_family == AF_INET) { + struct sockaddr_in* sin4lo = (struct sockaddr_in*)lo.ai_addr; + struct sockaddr_in* sin4hi = (struct sockaddr_in*)hi.ai_addr; + result = memcmp(&sin4hi->sin_addr, &sin4lo->sin_addr, sizeof(in_addr)); + } else if (lo.ai_family == AF_INET6) { + struct sockaddr_in6* sin6lo = (struct sockaddr_in6*)lo.ai_addr; + struct sockaddr_in6* sin6hi = (struct sockaddr_in6*)hi.ai_addr; + result = memcmp(&sin6hi->sin6_addr, &sin6lo->sin6_addr, sizeof(in6_addr)); + } else { + assert (false); + return false; + } + return true; +} + +void SocketAddress::firstAddress() const { + if (addrInfo) { + currentAddrInfo = addrInfo; + } else { + (void) getAddrInfo(*this); + } +} + +bool SocketAddress::nextAddress() const { + bool r = currentAddrInfo->ai_next != 0; + if (r) + currentAddrInfo = currentAddrInfo->ai_next; + return r; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + if (!sa.addrInfo) { + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for + hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6 + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (sa.host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = sa.host.c_str(); + } + const char* service = sa.port.empty() ? "0" : sa.port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n))); + sa.currentAddrInfo = sa.addrInfo; + } + + return *sa.currentAddrInfo; +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp new file mode 100644 index 0000000000..29f673c156 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -0,0 +1,735 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SslAsynchIO.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Socket.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" + +#include "qpid/sys/windows/check.h" + +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + +#include <queue> +#include <boost/bind.hpp> +#include "AsynchIO.h" + +namespace qpid { +namespace sys { +namespace windows { + +namespace { + + /* + * To make the SSL encryption more efficient, set up a new BufferBase + * that leaves room for the SSL header to be prepended and the SSL + * trailer to be appended. + * + * This works by accepting a properly formed BufferBase, remembering it, + * and resetting the members of this struct to reflect the reserved + * header and trailer areas. It's only needed for giving buffers up to + * the frame layer for writing into. + */ + struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase { + qpid::sys::AsynchIO::BufferBase* aioBuff; + + SslIoBuff (qpid::sys::AsynchIO::BufferBase *base, + const SecPkgContext_StreamSizes &sizes) + : qpid::sys::AsynchIO::BufferBase(&base->bytes[sizes.cbHeader], + std::min(base->byteCount - sizes.cbHeader - sizes.cbTrailer, + sizes.cbMaximumMessage)), + aioBuff(base) + {} + + ~SslIoBuff() {} + }; +} + +SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb, + NegotiateDoneCallback nCb) : + credHandle(hCred), + aio(0), + state(Negotiating), + readCallback(rCb), + idleCallback(iCb), + negotiateDoneCallback(nCb), + queuedDelete(false), + queuedClose(false), + reapCheckPending(false), + started(false), + leftoverPlaintext(0) +{ + SecInvalidateHandle(&ctxtHandle); + peerAddress = s.getPeerAddress(); + aio = qpid::sys::AsynchIO::create(s, + boost::bind(&SslAsynchIO::sslDataIn, this, _1, _2), + eofCb, + disCb, + cCb, + eCb, + boost::bind(&SslAsynchIO::idle, this, _1)); +} + +SslAsynchIO::~SslAsynchIO() { + leftoverPlaintext = 0; +} + +void SslAsynchIO::queueForDeletion() { + // Called exactly once, always on the IO completion thread. + bool authenticated = (state != Negotiating); + state = ShuttingDown; + if (authenticated) { + // Tell SChannel we are done. + DWORD shutdown = SCHANNEL_SHUTDOWN; + SecBuffer shutBuff; + shutBuff.cbBuffer = sizeof(DWORD); + shutBuff.BufferType = SECBUFFER_TOKEN; + shutBuff.pvBuffer = &shutdown; + SecBufferDesc desc; + desc.ulVersion = SECBUFFER_VERSION; + desc.cBuffers = 1; + desc.pBuffers = &shutBuff; + ::ApplyControlToken(&ctxtHandle, &desc); + negotiateStep(0); + } + + queueWriteClose(); + queuedDelete = true; + + // This method effectively disconnects the layer above; pass it on the + // AsynchIO and delete. + aio->queueForDeletion(); + + if (!reapCheckPending) + delete(this); +} + +void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) { + aio->start(poller); + started = true; + startNegotiate(); +} + +void SslAsynchIO::createBuffers(uint32_t size) { + // Reserve an extra buffer to hold unread plaintext or trailing encrypted input. + windows::AsynchIO *waio = dynamic_cast<windows::AsynchIO*>(aio); + waio->setBufferCount(waio->getBufferCount() + 1); + aio->createBuffers(size); +} + +void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { + aio->queueReadBuffer(buff); +} + +void SslAsynchIO::unread(AsynchIO::BufferBase* buff) { + // This is plaintext data being given back for more. Since it's already + // decrypted, don't give it back to the aio layer; keep it to append + // any new data for the upper layer. + assert(buff); + buff->squish(); + assert(leftoverPlaintext == 0); + leftoverPlaintext = buff; +} + +void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) { + // @@TODO: Need to delay the write if the session is renegotiating. + + // Should not have gotten here without an SslIoBuff. This assert is + // primarily to catch any stray cases where write is called with a buffer + // not obtained via getQueuedBuffer. + SslIoBuff *sslBuff = dynamic_cast<SslIoBuff*>(buff); + assert(sslBuff != 0); + + // Encrypt and hand off to the io layer. Remember that the upper layer's + // encoding was working on, and adjusting counts for, the SslIoBuff. + // Update the count of the original BufferBase before handing off to + // the I/O layer. + buff = sslBuff->aioBuff; + SecBuffer buffs[4]; + buffs[0].cbBuffer = schSizes.cbHeader; + buffs[0].BufferType = SECBUFFER_STREAM_HEADER; + buffs[0].pvBuffer = buff->bytes; // This space was left by SslIoBuff + buffs[1].cbBuffer = sslBuff->dataCount; + buffs[1].BufferType = SECBUFFER_DATA; + buffs[1].pvBuffer = sslBuff->bytes; + buffs[2].cbBuffer = schSizes.cbTrailer; + buffs[2].BufferType = SECBUFFER_STREAM_TRAILER; + buffs[2].pvBuffer = &sslBuff->bytes[sslBuff->dataCount]; + buffs[3].cbBuffer = 0; + buffs[3].BufferType = SECBUFFER_EMPTY; + buffs[3].pvBuffer = 0; + SecBufferDesc buffDesc; + buffDesc.ulVersion = SECBUFFER_VERSION; + buffDesc.cBuffers = 4; + buffDesc.pBuffers = buffs; + SECURITY_STATUS status = ::EncryptMessage(&ctxtHandle, 0, &buffDesc, 0); + + // EncryptMessage encrypts the data in place. The header and trailer + // areas were left previously and must now be included in the updated + // count of bytes to write to the peer. + delete sslBuff; + buff->dataCount = buffs[0].cbBuffer + buffs[1].cbBuffer + buffs[2].cbBuffer; + aio->queueWrite(buff); +} + +void SslAsynchIO::notifyPendingWrite() { + aio->notifyPendingWrite(); +} + +void SslAsynchIO::queueWriteClose() { + qpid::sys::Mutex::ScopedLock l(lock); + if (queuedClose) + return; + queuedClose = true; + if (started) { + reapCheckPending = true; + // Move tear down logic to an IO thread. + aio->requestCallback(boost::bind(&SslAsynchIO::reapCheck, this)); + } + aio->queueWriteClose(); +} + +void SslAsynchIO::reapCheck() { + // Serialized check in the IO thread whether to self-delete. + reapCheckPending = false; + if (queuedDelete) + delete(this); +} + +bool SslAsynchIO::writeQueueEmpty() { + return aio->writeQueueEmpty(); +} + +// Queue the specified callback for invocation from an I/O thread. +void SslAsynchIO::requestCallback(RequestCallback callback) { + aio->requestCallback(callback); +} + +/** + * Return a queued buffer read to put new data in for writing. + * This method ALWAYS returns a SslIoBuff reflecting a BufferBase from + * the aio layer that has header and trailer space reserved. + */ +AsynchIO::BufferBase* SslAsynchIO::getQueuedBuffer() { + SslIoBuff *sslBuff = 0; + BufferBase* buff = aio->getQueuedBuffer(); + if (buff == 0) + return 0; + + sslBuff = new SslIoBuff(buff, schSizes); + return sslBuff; +} + +SecuritySettings SslAsynchIO::getSecuritySettings() { + SecPkgContext_KeyInfo info; + memset(&info, 0, sizeof(info)); + ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_KEY_INFO, &info); + + SecuritySettings settings; + settings.ssf = info.KeySize; + settings.authid = std::string(); + return settings; +} + +void SslAsynchIO::negotiationDone() { + switch(state) { + case Negotiating: + ::QueryContextAttributes(&ctxtHandle, + SECPKG_ATTR_STREAM_SIZES, + &schSizes); + state = Running; + if (negotiateDoneCallback) + negotiateDoneCallback(SEC_E_OK); + break; + case Redo: + state = Running; + break; + case ShuttingDown: + break; + default: + assert(0); + } +} + +void SslAsynchIO::negotiationFailed(SECURITY_STATUS status) { + QPID_LOG(notice, "SSL negotiation failed to " << peerAddress << ": " << + qpid::sys::strError(status)); + if (negotiateDoneCallback) + negotiateDoneCallback(status); + else + queueWriteClose(); +} + +void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) { + if (state == ShuttingDown) { + return; + } + if (state != Running) { + negotiateStep(buff); + return; + } + + // Decrypt one block; if there's legit data, pass it on through. + // However, it's also possible that the peer hasn't supplied enough + // data yet, or the session needs to be renegotiated, or the session + // is ending. + SecBuffer recvBuffs[4]; + recvBuffs[0].cbBuffer = buff->dataCount; + recvBuffs[0].BufferType = SECBUFFER_DATA; + recvBuffs[0].pvBuffer = &buff->bytes[buff->dataStart]; + recvBuffs[1].BufferType = SECBUFFER_EMPTY; + recvBuffs[2].BufferType = SECBUFFER_EMPTY; + recvBuffs[3].BufferType = SECBUFFER_EMPTY; + SecBufferDesc buffDesc; + buffDesc.ulVersion = SECBUFFER_VERSION; + buffDesc.cBuffers = 4; + buffDesc.pBuffers = recvBuffs; + SECURITY_STATUS status = ::DecryptMessage(&ctxtHandle, &buffDesc, 0, NULL); + if (status != SEC_E_OK) { + if (status == SEC_E_INCOMPLETE_MESSAGE) { + // Give the partially filled buffer back and get more data + a.unread(buff); + } + else { + // Don't need this any more... + a.queueReadBuffer(buff); + + if (status == SEC_I_RENEGOTIATE) { + state = Redo; + negotiateStep(0); + } + else if (status == SEC_I_CONTEXT_EXPIRED) { + queueWriteClose(); + } + else { + throw QPID_WINDOWS_ERROR(status); + } + } + return; + } + + // All decrypted and verified... continue with AMQP. The recvBuffs have + // been set up by DecryptMessage to demarcate the SSL header, data, and + // trailer, as well as any extra data left over. Walk through and find + // that info, adjusting the buff data accordingly to reflect only the + // actual decrypted data. + // If there's extra data, copy that out to a new buffer and run through + // this method again. + char *extraBytes = 0; + int32_t extraLength = 0; + BufferBase *extraBuff = 0; + for (int i = 0; i < 4; i++) { + switch (recvBuffs[i].BufferType) { + case SECBUFFER_STREAM_HEADER: + buff->dataStart += recvBuffs[i].cbBuffer; + // Fall through - also don't count these bytes as data + case SECBUFFER_STREAM_TRAILER: + buff->dataCount -= recvBuffs[i].cbBuffer; + break; + case SECBUFFER_EXTRA: + extraBytes = (char *) recvBuffs[i].pvBuffer; + extraLength = recvBuffs[i].cbBuffer; + break; + default: + break; + } + } + + // Since we've already taken (possibly) all the available bytes from the + // aio layer, need to be sure that everything that's processable is + // processed before returning back to aio. It could be that any + // leftoverPlaintext data plus new buff data won't fit in one buffer, so + // need to keep going around the input processing loop until either + // all the bytes are gone, or there's less than a full frame remaining + // (so we can count on more bytes being on the way via aio). + do { + BufferBase *temp = 0; + // See if there was partial data left over from last time. If so, append this new + // data to that and release the current buff back to aio. Assume that + // leftoverPlaintext was squished so the data starts at 0. + if (leftoverPlaintext != 0) { + // There is leftover data; append all the new data that will fit. + int32_t count = buff->dataCount; + if (count) { + if (leftoverPlaintext->dataCount + count > leftoverPlaintext->byteCount) + count = (leftoverPlaintext->byteCount - leftoverPlaintext->dataCount); + ::memmove(&leftoverPlaintext->bytes[leftoverPlaintext->dataCount], + &buff->bytes[buff->dataStart], count); + leftoverPlaintext->dataCount += count; + buff->dataCount -= count; + buff->dataStart += count; + // Prepare to pass the buffer up. Beware that the read callback + // may do an unread(), so move the leftoverPlaintext pointer + // out of the way. It also may release the buffer back to aio, + // so in either event, the pointer passed to the callback is not + // valid on return. + temp = leftoverPlaintext; + leftoverPlaintext = 0; + } + else { + // All decrypted data used up, decrypt some more or get more from the aio + if (extraLength) { + buff->dataStart = extraBytes - buff->bytes; + buff->dataCount = extraLength; + sslDataIn(a, buff); + return; + } + else { + a.queueReadBuffer(buff); + return; + } + } + } + else { + // Use buff, but first offload data not yet encrypted + if (extraLength) { + // Very important to get this buffer from the downstream aio. + // The ones constructed from the local getQueuedBuffer() are + // restricted size for encrypting. However, data coming up from + // TCP may have a bunch of SSL segments coalesced and be much + // larger than the maximum single SSL segment. + extraBuff = a.getQueuedBuffer(); + if (0 == extraBuff) { + // No leftoverPlaintext, so a spare buffer should be available + throw QPID_WINDOWS_ERROR(WSAENOBUFS); + } + memmove(extraBuff->bytes, extraBytes, extraLength); + extraBuff->dataCount = extraLength; + extraLength = 0; + } + temp = buff; + buff = 0; + } + if (readCallback) { + // The callback guard here is to prevent an upcall from deleting + // this out from under us via queueForDeletion(). + readCallback(*this, temp); + } + else + a.queueReadBuffer(temp); // What else can we do with this??? + } while (buff != 0); + + // Ok, the current decrypted data is done. If there was any extra data, + // go back and handle that. + if (extraBuff != 0) { + sslDataIn(a, extraBuff); + } +} + +void SslAsynchIO::idle(qpid::sys::AsynchIO&) { + // Don't relay idle indication to layer above until SSL session is up. + if (state == Running) { + state = Running; + if (idleCallback) + idleCallback(*this); + } +} + +/**************************************************/ + +namespace { + +bool unsafeNegotiatedTlsVersion(CtxtHandle &ctxtHandle) { + // See if SChannel ultimately negotiated <= SSL3, perhaps due to + // global registry settings. + SecPkgContext_ConnectionInfo info; + ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_CONNECTION_INFO, &info); + // Ascending bit patterns denote newer SSL/TLS protocol versions + return (info.dwProtocol < SP_PROT_TLS1_SERVER) ? true : false; +} + +} // namespace + +/**************************************************/ + +ClientSslAsynchIO::ClientSslAsynchIO(const std::string& brokerHost, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb, + NegotiateDoneCallback nCb) : + SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb), + serverHost(brokerHost), clientCertRequested(false) +{ +} + +void ClientSslAsynchIO::startNegotiate() { + // SEC_CHAR is non-const, so do all the typing here. + SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str()); + + // Need a buffer to receive the token to send to the server. + BufferBase *buff = aio->getQueuedBuffer(); + ULONG ctxtRequested = ISC_REQ_STREAM | ISC_REQ_USE_SUPPLIED_CREDS; + ULONG ctxtAttrs; + // sendBuffs gets information to forward to the peer. + SecBuffer sendBuffs[2]; + sendBuffs[0].cbBuffer = buff->byteCount; + sendBuffs[0].BufferType = SECBUFFER_TOKEN; + sendBuffs[0].pvBuffer = buff->bytes; + sendBuffs[1].cbBuffer = 0; + sendBuffs[1].BufferType = SECBUFFER_EMPTY; + sendBuffs[1].pvBuffer = 0; + SecBufferDesc sendBuffDesc; + sendBuffDesc.ulVersion = SECBUFFER_VERSION; + sendBuffDesc.cBuffers = 2; + sendBuffDesc.pBuffers = sendBuffs; + SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle, + NULL, + host, + ctxtRequested, + 0, + 0, + NULL, + 0, + &ctxtHandle, + &sendBuffDesc, + &ctxtAttrs, + NULL); + + if (status == SEC_I_CONTINUE_NEEDED) { + buff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(buff); + } +} + +void ClientSslAsynchIO::negotiateStep(BufferBase* buff) { + // SEC_CHAR is non-const, so do all the typing here. + SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str()); + ULONG ctxtRequested = ISC_REQ_STREAM | ISC_REQ_USE_SUPPLIED_CREDS; + ULONG ctxtAttrs; + + // tokenBuffs describe the buffer that's coming in. It should have + // a token from the SSL server. + SecBuffer tokenBuffs[2]; + tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0; + tokenBuffs[0].BufferType = SECBUFFER_TOKEN; + tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0; + tokenBuffs[1].cbBuffer = 0; + tokenBuffs[1].BufferType = SECBUFFER_EMPTY; + tokenBuffs[1].pvBuffer = 0; + SecBufferDesc tokenBuffDesc; + tokenBuffDesc.ulVersion = SECBUFFER_VERSION; + tokenBuffDesc.cBuffers = 2; + tokenBuffDesc.pBuffers = tokenBuffs; + + // Need a buffer to receive any token to send back to the server. + BufferBase *sendbuff = aio->getQueuedBuffer(); + // sendBuffs gets information to forward to the peer. + SecBuffer sendBuffs[2]; + sendBuffs[0].cbBuffer = sendbuff->byteCount; + sendBuffs[0].BufferType = SECBUFFER_TOKEN; + sendBuffs[0].pvBuffer = sendbuff->bytes; + sendBuffs[1].cbBuffer = 0; + sendBuffs[1].BufferType = SECBUFFER_EMPTY; + sendBuffs[1].pvBuffer = 0; + SecBufferDesc sendBuffDesc; + sendBuffDesc.ulVersion = SECBUFFER_VERSION; + sendBuffDesc.cBuffers = 2; + sendBuffDesc.pBuffers = sendBuffs; + + SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle, + &ctxtHandle, + host, + ctxtRequested, + 0, + 0, + &tokenBuffDesc, + 0, + NULL, + &sendBuffDesc, + &ctxtAttrs, + NULL); + + if (status == SEC_E_INCOMPLETE_MESSAGE) { + // Not enough - get more data from the server then try again. + aio->unread(buff); + aio->queueReadBuffer(sendbuff); // Don't need this one for now... + return; + } + // Done with the buffer that came in... + if (buff) + aio->queueReadBuffer(buff); + if (status == SEC_I_CONTINUE_NEEDED) { + // check if server has requested a client certificate + if (!clientCertRequested) { + SecPkgContext_IssuerListInfoEx caList; + memset(&caList, 0, sizeof(caList)); + ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_ISSUER_LIST_EX, &caList); + if (caList.cIssuers > 0) + clientCertRequested = true; + if (caList.aIssuers) + ::FreeContextBuffer(caList.aIssuers); + } + + sendbuff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(sendbuff); + return; + } + // Nothing to send back to the server... + aio->queueReadBuffer(sendbuff); + + if (status == SEC_E_OK && unsafeNegotiatedTlsVersion(ctxtHandle)) { + // Refuse a connection that negotiates to less than TLS 1.0. + QPID_LOG(notice, "client SSL negotiation to unsafe protocol version."); + status = SEC_E_UNSUPPORTED_FUNCTION; + } + + // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be + // either session stop or negotiation done (session up). + if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED) + negotiationDone(); + else { + if (clientCertRequested && status == SEC_E_CERT_UNKNOWN) + // ISC_REQ_USE_SUPPLIED_CREDS makes us reponsible for this case + // (no client cert). Map it to its counterpart: + status = SEC_E_INCOMPLETE_CREDENTIALS; + negotiationFailed(status); + } +} + +/*************************************************/ + +ServerSslAsynchIO::ServerSslAsynchIO(bool clientMustAuthenticate, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb, + BuffersEmptyCallback eCb, + IdleCallback iCb, + NegotiateDoneCallback nCb) : + SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb), + clientAuth(clientMustAuthenticate) +{ +} + +void ServerSslAsynchIO::startNegotiate() { + // Nothing... need the client to send a token first. +} + +void ServerSslAsynchIO::negotiateStep(BufferBase* buff) { + ULONG ctxtRequested = ASC_REQ_STREAM; + if (clientAuth) + ctxtRequested |= ASC_REQ_MUTUAL_AUTH; + ULONG ctxtAttrs; + + // tokenBuffs describe the buffer that's coming in. It should have + // a token from the SSL server except if shutting down or renegotiating. + SecBuffer tokenBuffs[2]; + tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0; + tokenBuffs[0].BufferType = SECBUFFER_TOKEN; + tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0; + tokenBuffs[1].cbBuffer = 0; + tokenBuffs[1].BufferType = SECBUFFER_EMPTY; + tokenBuffs[1].pvBuffer = 0; + SecBufferDesc tokenBuffDesc; + tokenBuffDesc.ulVersion = SECBUFFER_VERSION; + tokenBuffDesc.cBuffers = 2; + tokenBuffDesc.pBuffers = tokenBuffs; + + // Need a buffer to receive any token to send back to the server. + BufferBase *sendbuff = aio->getQueuedBuffer(); + // sendBuffs gets information to forward to the peer. + SecBuffer sendBuffs[2]; + sendBuffs[0].cbBuffer = sendbuff->byteCount; + sendBuffs[0].BufferType = SECBUFFER_TOKEN; + sendBuffs[0].pvBuffer = sendbuff->bytes; + sendBuffs[1].cbBuffer = 0; + sendBuffs[1].BufferType = SECBUFFER_EMPTY; + sendBuffs[1].pvBuffer = 0; + SecBufferDesc sendBuffDesc; + sendBuffDesc.ulVersion = SECBUFFER_VERSION; + sendBuffDesc.cBuffers = 2; + sendBuffDesc.pBuffers = sendBuffs; + PCtxtHandle ctxtHandlePtr = (SecIsValidHandle(&ctxtHandle)) ? &ctxtHandle : 0; + SECURITY_STATUS status = ::AcceptSecurityContext(&credHandle, + ctxtHandlePtr, + &tokenBuffDesc, + ctxtRequested, + 0, + &ctxtHandle, + &sendBuffDesc, + &ctxtAttrs, + NULL); + if (status == SEC_E_INCOMPLETE_MESSAGE) { + // Not enough - get more data from the server then try again. + if (buff) + aio->unread(buff); + aio->queueReadBuffer(sendbuff); // Don't need this one for now... + return; + } + // Done with the buffer that came in... + if (buff) + aio->queueReadBuffer(buff); + if (status == SEC_I_CONTINUE_NEEDED) { + sendbuff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(sendbuff); + return; + } + // There may have been a token generated; if so, send it to the client. + if (sendBuffs[0].cbBuffer > 0 && state != ShuttingDown) { + sendbuff->dataCount = sendBuffs[0].cbBuffer; + aio->queueWrite(sendbuff); + } + else + // Nothing to send back to the server... + aio->queueReadBuffer(sendbuff); + + if (status == SEC_E_OK && unsafeNegotiatedTlsVersion(ctxtHandle)) { + // Refuse a connection that negotiates to less than TLS 1.0. + QPID_LOG(notice, "server SSL negotiation to unsafe protocol version."); + status = SEC_E_UNSUPPORTED_FUNCTION; + } + + // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be + // either session stop or negotiation done (session up). + if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED) { + if (clientAuth) + QPID_LOG(warning, "DID WE CHECK FOR CLIENT AUTH???"); + + negotiationDone(); + } + else { + negotiationFailed(status); + } +} + +}}} // namespace qpid::sys::windows diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h new file mode 100644 index 0000000000..3d00e1c429 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -0,0 +1,192 @@ +#ifndef _sys_windows_SslAsynchIO +#define _sys_windows_SslAsynchIO + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Poller.h" +#include "qpid/CommonImportExport.h" +#include "qpid/sys/Mutex.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <windows.h> +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + +namespace qpid { +namespace sys { +namespace windows { + +/* + * SSL/Schannel shim between the frame-handling and AsynchIO layers. + * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class + * gets involved for SSL negotiations and encrypt/decrypt. The details of + * how this all works are invisible to the layers on either side. The only + * change from normal AsynchIO usage is that there's an extra callback + * from SslAsynchIO to indicate that the initial session negotiation is + * complete. + * + * The details of session negotiation are different for client and server + * SSL roles. These differences are handled by deriving separate client + * and server role classes. + */ +class SslAsynchIO : public qpid::sys::AsynchIO { +public: + typedef boost::function1<void, SECURITY_STATUS> NegotiateDoneCallback; + + SslAsynchIO(const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0, + NegotiateDoneCallback nCb = 0); + ~SslAsynchIO(); + + virtual void queueForDeletion(); + + virtual void start(qpid::sys::Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); + virtual void queueReadBuffer(BufferBase* buff); + virtual void unread(BufferBase* buff); + virtual void queueWrite(BufferBase* buff); + virtual void notifyPendingWrite(); + virtual void queueWriteClose(); + virtual bool writeQueueEmpty(); + virtual void requestCallback(RequestCallback); + virtual BufferBase* getQueuedBuffer(); + virtual SecuritySettings getSecuritySettings(void); + +protected: + CredHandle credHandle; + + // AsynchIO layer below that's actually doing the I/O + qpid::sys::AsynchIO *aio; + Mutex lock; + + // Track what the state of the SSL session is. Have to know when it's + // time to notify the upper layer that the session is up, and also to + // know when it's not legit to pass data through to either side. + enum { Negotiating, Running, Redo, ShuttingDown } state; + CtxtHandle ctxtHandle; + TimeStamp credExpiry; + + // Client- and server-side SSL subclasses implement these to do the + // proper negotiation steps. negotiateStep() is called with a buffer + // just received from the peer. + virtual void startNegotiate() = 0; + virtual void negotiateStep(BufferBase *buff) = 0; + + // The negotiating steps call one of these when it's finalized: + void negotiationDone(); + void negotiationFailed(SECURITY_STATUS status); + +private: + // These are callbacks from AsynchIO to here. + void sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff); + void idle(qpid::sys::AsynchIO&); + void reapCheck(); + + // These callbacks are to the layer above. + ReadCallback readCallback; + IdleCallback idleCallback; + NegotiateDoneCallback negotiateDoneCallback; + + volatile bool queuedDelete; + volatile bool queuedClose; + volatile bool reapCheckPending; + bool started; + + // Address of peer, in case it's needed for logging. + std::string peerAddress; + + // Partial buffer of decrypted plaintext given back by the layer above. + AsynchIO::BufferBase *leftoverPlaintext; + + SecPkgContext_StreamSizes schSizes; +}; + +/* + * SSL/Schannel client-side shim between the frame-handling and AsynchIO + * layers. + */ +class ClientSslAsynchIO : public SslAsynchIO { +public: + // Args same as for SslIoShim, with the addition of brokerHost which is + // the expected SSL name of the server. + QPID_COMMON_EXTERN ClientSslAsynchIO(const std::string& brokerHost, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0, + NegotiateDoneCallback nCb = 0); + +private: + std::string serverHost; + bool clientCertRequested; + + // Client- and server-side SSL subclasses implement these to do the + // proper negotiation steps. negotiateStep() is called with a buffer + // just received from the peer. + void startNegotiate(); + void negotiateStep(BufferBase *buff); +}; +/* + * SSL/Schannel server-side shim between the frame-handling and AsynchIO + * layers. + */ +class ServerSslAsynchIO : public SslAsynchIO { +public: + QPID_COMMON_EXTERN ServerSslAsynchIO(bool clientMustAuthenticate, + const qpid::sys::Socket& s, + CredHandle hCred, + ReadCallback rCb, + EofCallback eofCb, + DisconnectCallback disCb, + ClosedCallback cCb = 0, + BuffersEmptyCallback eCb = 0, + IdleCallback iCb = 0, + NegotiateDoneCallback nCb = 0); + +private: + bool clientAuth; + + // Client- and server-side SSL subclasses implement these to do the + // proper negotiation steps. negotiateStep() is called with a buffer + // just received from the peer. + void startNegotiate(); + void negotiateStep(BufferBase *buff); +}; + +}}} // namespace qpid::sys::windows + +#endif // _sys_windows_SslAsynchIO diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp new file mode 100644 index 0000000000..de8f10b0e9 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp @@ -0,0 +1,279 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <string> +#include <windows.h> +#include "qpid/Msg.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/util.h" +#include "qpid/sys/windows/SslCredential.h" + + +namespace qpid { +namespace sys { +namespace windows { + + +SslCredential::SslCredential() : certStore(0), cert(0), hostnameVerification(true) +{ + SecInvalidateHandle(&credHandle); + memset(&cred, 0, sizeof(cred)); + cred.dwVersion = SCHANNEL_CRED_VERSION; + cred.dwFlags = SCH_CRED_NO_DEFAULT_CREDS; +} + +SslCredential::~SslCredential() +{ + if (SecIsValidHandle(&credHandle)) + ::FreeCredentialsHandle(&credHandle); + if (cert) + ::CertFreeCertificateContext(cert); + if (certStore) + ::CertCloseStore(certStore, CERT_CLOSE_STORE_FORCE_FLAG); +} + +bool SslCredential::load(const std::string& certName) +{ + cert = findCertificate(certName); + if (cert != NULL) { + // assign the certificate into the credentials + cred.paCred = &cert; + cred.cCreds = 1; + } + if (!hostnameVerification) + cred.dwFlags |= SCH_CRED_NO_SERVERNAME_CHECK; + + SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL, + UNISP_NAME, + SECPKG_CRED_OUTBOUND, + NULL, + &cred, + NULL, + NULL, + &credHandle, + &credExpiry); + if (status != SEC_E_OK) + throw QPID_WINDOWS_ERROR(status); + + return (cert != NULL); +} + +CredHandle SslCredential::handle() +{ + return credHandle; +} + +std::string SslCredential::error() +{ + // Certificate needed after all. Return main error and log additional context + if (!loadError.logMessage.empty()) + QPID_LOG(warning, loadError.logMessage); + return loadError.error; +} + +void SslCredential::ignoreHostnameVerificationFailure(){ + hostnameVerification = false; +} + +void SslCredential::loadPrivCertStore() +{ + // Get a handle to the system store or pkcs#12 file + qpid::sys::ssl::SslOptions& opts = qpid::sys::ssl::SslOptions::global; + if (opts.certFilename.empty()) { + // opening a system store, names are not case sensitive + std::string store = opts.certStore.empty() ? "my" : opts.certStore; + std::transform(store.begin(), store.end(), store.begin(), ::tolower); + // map confusing GUI name to actual registry store name + if (store == "personal") + store = "my"; + certStore = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A, 0, NULL, + CERT_STORE_OPEN_EXISTING_FLAG | CERT_STORE_READONLY_FLAG | + CERT_SYSTEM_STORE_CURRENT_USER, store.c_str()); + if (!certStore) { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Could not open system certificate store: " << store, status); + return; + } + QPID_LOG(debug, "SslConnector using certifcates from system store: " << store); + } else { + // opening the store from file and populating it with a private key + HANDLE certFileHandle = NULL; + certFileHandle = CreateFile(opts.certFilename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + if (INVALID_HANDLE_VALUE == certFileHandle) { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Failed to open the file holding the private key: " << opts.certFilename, status); + return; + } + std::vector<BYTE> certEncoded; + DWORD certEncodedSize = 0L; + const DWORD fileSize = GetFileSize(certFileHandle, NULL); + if (INVALID_FILE_SIZE != fileSize) { + certEncoded.resize(fileSize); + bool result = false; + result = ReadFile(certFileHandle, &certEncoded[0], + fileSize, + &certEncodedSize, + NULL); + if (!result) { + // the read failed, return the error as an HRESULT + HRESULT status = GetLastError(); + CloseHandle(certFileHandle); + loadError.set(Msg() << "Reading the private key from file failed " << opts.certFilename, status); + return; + } + } + else { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Unable to read the certificate file " << opts.certFilename, status); + return; + } + CloseHandle(certFileHandle); + + CRYPT_DATA_BLOB blobData; + blobData.cbData = certEncodedSize; + blobData.pbData = &certEncoded[0]; + + // get passwd from file and convert to null terminated wchar_t (Windows UCS2) + std::string passwd = getPasswd(opts.certPasswordFile); + if (loadError.pending()) + return; + int pwlen = passwd.length(); + std::vector<wchar_t> pwUCS2(pwlen + 1, L'\0'); + int nwc = MultiByteToWideChar(CP_UTF8, MB_ERR_INVALID_CHARS, passwd.data(), pwlen, &pwUCS2[0], pwlen); + if (!nwc) { + HRESULT status = GetLastError(); + loadError.set("Error converting password from UTF8", status); + return; + } + + certStore = PFXImportCertStore(&blobData, &pwUCS2[0], 0); + if (certStore == NULL) { + HRESULT status = GetLastError(); + loadError.set("Failed to open the certificate store", status); + return; + } + QPID_LOG(debug, "SslConnector using certificate from pkcs#12 file: " << opts.certFilename); + } +} + + +PCCERT_CONTEXT SslCredential::findCertificate(const std::string& name) +{ + loadPrivCertStore(); + if (loadError.pending()) + return NULL; + + // search for the certificate by Friendly Name + PCCERT_CONTEXT tmpctx = NULL; + while (tmpctx = CertEnumCertificatesInStore(certStore, tmpctx)) { + DWORD len = CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, + 0, NULL, NULL, 0); + if (len == 1) + continue; + std::vector<char> ctxname(len); + CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, + 0, NULL, &ctxname[0], len); + bool found = !name.compare(&ctxname[0]); + if (found) + break; + } + + // verify whether some certificate has been found + if (tmpctx == NULL) { + loadError.set(Msg() << "Client SSL/TLS certificate not found in the certificate store for name " << name, + "client certificate not found"); + } + return tmpctx; +} + + +std::string SslCredential::getPasswd(const std::string& filename) +{ + std::string passwd; + if (filename == "") + return passwd; + + HANDLE pwfHandle = CreateFile(filename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + + if (INVALID_HANDLE_VALUE == pwfHandle) { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Failed to open the password file: " << filename, status); + return passwd; + } + + const DWORD fileSize = GetFileSize(pwfHandle, NULL); + if (fileSize == INVALID_FILE_SIZE) { + CloseHandle(pwfHandle); + loadError.set("", "Cannot read password file"); + return passwd; + } + + std::vector<char> pwbuf; + pwbuf.resize(fileSize); + DWORD nbytes = 0; + if (!ReadFile(pwfHandle, &pwbuf[0], fileSize, &nbytes, NULL)) { + HRESULT status = GetLastError(); + CloseHandle(pwfHandle); + loadError.set("Error reading password file", status); + return passwd; + } + CloseHandle(pwfHandle); + + if (nbytes == 0) + return passwd; + + while (nbytes) { + if ((pwbuf[nbytes-1] == 012) || (pwbuf[nbytes-1] == 015)) + nbytes--; + else + break; + } + + if (nbytes) + passwd.assign(&pwbuf[0], nbytes); + + return passwd; +} + +void SslCredential::SavedError::set(const std::string &lm, const std::string es) { + logMessage = lm; + error = es; +} + +void SslCredential::SavedError::set(const std::string &lm, int status) { + logMessage = lm; + error = qpid::sys::strError(status); +} + +void SslCredential::SavedError::clear() { + logMessage.clear(); + error.clear(); +} + +bool SslCredential::SavedError::pending() { + return !logMessage.empty() || !error.empty(); +} + +}}} diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.h b/qpid/cpp/src/qpid/sys/windows/SslCredential.h new file mode 100644 index 0000000000..25d174a2fa --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.h @@ -0,0 +1,84 @@ +#ifndef _sys_SslCredential +#define _sys_SslCredential +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonImportExport.h" + +#include <string.h> +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include <security.h> +#include <Schnlsp.h> +#undef SECURITY_WIN32 + +namespace qpid { +namespace sys { +namespace windows { + +/* + * Manage certificate data structures for SChannel. + * + * Note on client certificates: The Posix/NSS implementation performs a lazy + * client certificate search part way through the ssl handshake if the server + * requests one. Here, it is not known in advance if the server will + * request the certificate so the certificate is pre-loaded (even if never + * used). To match the Linux behavior, client certificate load problems are + * remembered and reported later if appropriate, but do not prevent the + * connection attempt. + */ + +class SslCredential { +public: + QPID_COMMON_EXTERN SslCredential(); + QPID_COMMON_EXTERN ~SslCredential(); + QPID_COMMON_EXTERN bool load(const std::string& certName); + QPID_COMMON_EXTERN CredHandle handle(); + QPID_COMMON_EXTERN std::string error(); + /** Proceed with connect inspite of hostname verifcation failures*/ + QPID_COMMON_EXTERN void ignoreHostnameVerificationFailure(); + +private: + struct SavedError { + std::string logMessage; + std::string error; + void set(const std::string &lm, const std::string es); + void set(const std::string &lm, int status); + void clear(); + bool pending(); + }; + + HCERTSTORE certStore; + PCCERT_CONTEXT cert; + SCHANNEL_CRED cred; + CredHandle credHandle; + TimeStamp credExpiry; + SavedError loadError; + bool hostnameVerification; + + PCCERT_CONTEXT findCertificate(const std::string& name); + void loadPrivCertStore(); + std::string getPasswd(const std::string& filename); +}; + +}}} + +#endif // _sys_SslCredential diff --git a/qpid/cpp/src/qpid/sys/windows/StrError.cpp b/qpid/cpp/src/qpid/sys/windows/StrError.cpp new file mode 100755 index 0000000000..546d399d16 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/StrError.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/StrError.h" +#include <string> +#include <string.h> +#include <windows.h> + +namespace qpid { +namespace sys { + +std::string strError(int err) { + const size_t bufsize = 512; + char buf[bufsize]; + buf[0] = 0; + if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK + | FORMAT_MESSAGE_FROM_SYSTEM, + 0, + err, + 0, // Default language + buf, + bufsize, + 0)) + { +#ifdef _MSC_VER + strerror_s(buf, bufsize, err); +#else + return std::string(strerror(err)); +#endif + } + return std::string(buf); +} + +}} diff --git a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp new file mode 100755 index 0000000000..fb58d53b81 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* GetNativeSystemInfo call requires _WIN32_WINNT 0x0501 or higher */ +#ifndef _WIN32_WINNT +# define _WIN32_WINNT 0x0501 +#endif + +#include "qpid/sys/SystemInfo.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" + +#include <assert.h> +#include <winsock2.h> +#include <ws2tcpip.h> +#include <windows.h> +#include <tlhelp32.h> + +#ifndef HOST_NAME_MAX +# define HOST_NAME_MAX 256 +#endif + +namespace qpid { +namespace sys { + +long SystemInfo::concurrency() { + SYSTEM_INFO sys_info; + ::GetSystemInfo (&sys_info); + long activeProcessors = 0; + DWORD_PTR mask = sys_info.dwActiveProcessorMask; + while (mask != 0) { + if (mask & 1) + ++activeProcessors; + mask >>= 1; + } + return activeProcessors; +} + +bool SystemInfo::getLocalHostname (Address &address) { + char name[HOST_NAME_MAX]; + if (::gethostname(name, sizeof(name)) != 0) { + errno = WSAGetLastError(); + return false; + } + address.host = name; + return true; +} + +static const std::string LOCALHOST("127.0.0.1"); +static const std::string TCP("tcp"); + +// Null function which always fails to find an network interface name +bool SystemInfo::getInterfaceAddresses(const std::string&, std::vector<std::string>&) +{ + return false; +} + +void SystemInfo::getSystemId (std::string &osName, + std::string &nodeName, + std::string &release, + std::string &version, + std::string &machine) +{ + osName = "Microsoft Windows"; + + char node[MAX_COMPUTERNAME_LENGTH + 1]; + DWORD nodelen = MAX_COMPUTERNAME_LENGTH + 1; + GetComputerName (node, &nodelen); + nodeName = node; + + OSVERSIONINFOEX vinfo; + vinfo.dwOSVersionInfoSize = sizeof(vinfo); + GetVersionEx ((OSVERSIONINFO *)&vinfo); + + SYSTEM_INFO sinfo; + GetNativeSystemInfo(&sinfo); + + switch(vinfo.dwMajorVersion) { + case 5: + switch(vinfo.dwMinorVersion) { + case 0: + release ="2000"; + break; + case 1: + release = "XP"; + break; + case 2: + if (sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_AMD64 || + sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_IA64) + release = "XP-64"; + else + release = "Server 2003"; + break; + default: + release = "Windows"; + } + break; + case 6: + if (vinfo.wProductType == VER_NT_SERVER) + release = "Server 2008"; + else + release = "Vista"; + break; + default: + release = "Microsoft Windows"; + } + version = vinfo.szCSDVersion; + + switch(sinfo.wProcessorArchitecture) { + case PROCESSOR_ARCHITECTURE_AMD64: + machine = "x86-64"; + break; + case PROCESSOR_ARCHITECTURE_IA64: + machine = "IA64"; + break; + case PROCESSOR_ARCHITECTURE_INTEL: + machine = "x86"; + break; + default: + machine = "unknown"; + break; + } +} + +uint32_t SystemInfo::getProcessId() +{ + return static_cast<uint32_t>(::GetCurrentProcessId()); +} + +uint32_t SystemInfo::getParentProcessId() +{ + // Only want info for the current process, so ask for something specific. + // The module info won't be used here but it keeps the snapshot limited to + // the current process so a search through all processes is not needed. + HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0); + if (snap == INVALID_HANDLE_VALUE) + return 0; + PROCESSENTRY32 entry; + entry.dwSize = sizeof(entry); + if (!Process32First(snap, &entry)) + entry.th32ParentProcessID = 0; + CloseHandle(snap); + return static_cast<uint32_t>(entry.th32ParentProcessID); +} + +std::string SystemInfo::getProcessName() +{ + std::string name; + + // Only want info for the current process, so ask for something specific. + // The module info won't be used here but it keeps the snapshot limited to + // the current process so a search through all processes is not needed. + HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0); + if (snap == INVALID_HANDLE_VALUE) + return name; + PROCESSENTRY32 entry; + entry.dwSize = sizeof(entry); + if (!Process32First(snap, &entry)) + entry.szExeFile[0] = '\0'; + CloseHandle(snap); + name = entry.szExeFile; + return name; +} + + +#ifdef _DLL +namespace windows { +// set from one or more Qpid DLLs: i.e. in DllMain with DLL_PROCESS_DETACH +QPID_EXPORT bool processExiting = false; +QPID_EXPORT bool libraryUnloading = false; +} +#endif + +bool SystemInfo::threadSafeShutdown() +{ +#ifdef _DLL + if (!windows::processExiting && !windows::libraryUnloading) { + // called before exit() or FreeLibrary(), or by a DLL without + // a participating DllMain. + QPID_LOG(warning, "invalid query for shutdown state"); + throw qpid::Exception(QPID_MSG("Unable to determine shutdown state.")); + } + return !windows::processExiting; +#else + // Not a DLL: shutdown can only be by exit() or return from main(). + return false; +#endif +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/Thread.cpp b/qpid/cpp/src/qpid/sys/windows/Thread.cpp new file mode 100755 index 0000000000..8034680664 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Thread.cpp @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Ensure definition of OpenThread in mingw +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/SystemInfo.h" + +#include <process.h> +#include <windows.h> + +/* + * This implementation distinguishes between two types of thread: Qpid + * threads (based on qpid::sys::Runnable) and the rest. It provides a + * join() that will not deadlock against the Windows loader lock for + * Qpid threads. + * + * System thread identifiers are unique per Windows thread; thread + * handles are not. Thread identifiers can be recycled, but keeping a + * handle open against the thread prevents recycling as long as + * shared_ptr references to a ThreadPrivate structure remain. + * + * There is a 1-1 relationship between Qpid threads and their + * ThreadPrivate structure. Non-Qpid threads do not need to find the + * qpidThreadDone handle, so there may be a 1-many relationship for + * them. + * + * TLS storage is used for a lockless solution for static library + * builds. The special case of LoadLibrary/FreeLibrary requires + * additional synchronization variables and resource cleanup in + * DllMain. _DLL marks the dynamic case. + */ + +namespace qpid { +namespace sys { + +class ThreadPrivate { +public: + friend class Thread; + friend unsigned __stdcall runThreadPrivate(void*); + typedef boost::shared_ptr<ThreadPrivate> shared_ptr; + ~ThreadPrivate(); + +private: + unsigned threadId; + HANDLE threadHandle; + HANDLE initCompleted; + HANDLE qpidThreadDone; + Runnable* runnable; + shared_ptr keepAlive; + + ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL), + qpidThreadDone(NULL), runnable(NULL) { + threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId); + QPID_WINDOWS_CHECK_CRT_NZ(threadHandle); + } + + ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL), + qpidThreadDone(NULL), runnable(r) {} + + void start(shared_ptr& p); + static shared_ptr createThread(Runnable* r); +}; + +}} // namespace qpid::sys + + +namespace { +using namespace qpid::sys; + +#ifdef _DLL +class ScopedCriticalSection +{ + public: + ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); } + ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); } + private: + CRITICAL_SECTION& criticalSection; +}; + +CRITICAL_SECTION threadLock; +long runningThreads = 0; +HANDLE threadsDone; +bool terminating = false; +#endif + + +DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES; + +DWORD getTlsIndex() { + if (tlsIndex != TLS_OUT_OF_INDEXES) + return tlsIndex; // already set + + DWORD trialIndex = TlsAlloc(); + QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource + + // only one thread gets to set the value + DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES); + if (actualIndex == TLS_OUT_OF_INDEXES) + return trialIndex; // we won the race + else { + TlsFree(trialIndex); + return actualIndex; + } +} + +} // namespace + +namespace qpid { +namespace sys { + +unsigned __stdcall runThreadPrivate(void* p) +{ + ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p); + TlsSetValue(getTlsIndex(), threadPrivate); + + WaitForSingleObject (threadPrivate->initCompleted, INFINITE); + CloseHandle (threadPrivate->initCompleted); + threadPrivate->initCompleted = NULL; + + try { + threadPrivate->runnable->run(); + } catch (...) { + // not our concern + } + + SetEvent (threadPrivate->qpidThreadDone); // allow join() + threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor + +#ifdef _DLL + { + ScopedCriticalSection l(threadLock); + if (--runningThreads == 0) + SetEvent(threadsDone); + } +#endif + return 0; +} + + +ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) { + ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable)); + tp->start(tp); + return tp; +} + +void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) { + getTlsIndex(); // fail here if OS problem, not in new thread + + initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL); + QPID_WINDOWS_CHECK_CRT_NZ(initCompleted); + qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL); + QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone); + +#ifdef _DLL + { + ScopedCriticalSection l(threadLock); + if (terminating) + throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary")); + runningThreads++; + } +#endif + + uintptr_t h = _beginthreadex(0, + 0, + runThreadPrivate, + (void *)this, + 0, + &threadId); + +#ifdef _DLL + if (h == NULL) { + ScopedCriticalSection l(threadLock); + if (--runningThreads == 0) + SetEvent(threadsDone); + } +#endif + + QPID_WINDOWS_CHECK_CRT_NZ(h); + + // Success + keepAlive = tp; + threadHandle = reinterpret_cast<HANDLE>(h); + SetEvent (initCompleted); +} + +ThreadPrivate::~ThreadPrivate() { + if (threadHandle) + CloseHandle (threadHandle); + if (initCompleted) + CloseHandle (initCompleted); + if (qpidThreadDone) + CloseHandle (qpidThreadDone); +} + + +Thread::Thread() {} + +Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {} + +Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {} + +Thread::operator bool() { + return !!impl; +} + +bool Thread::operator==(const Thread& t) const { + if (!impl || !t.impl) + return false; + return impl->threadId == t.impl->threadId; +} + +bool Thread::operator!=(const Thread& t) const { + return !(*this==t); +} + +void Thread::join() { + if (impl) { + DWORD status; + if (impl->runnable) { + HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle}; + // wait for either. threadHandle not signalled if loader + // lock held (FreeLibrary). qpidThreadDone not signalled + // if thread terminated by exit(). + status = WaitForMultipleObjects (2, handles, false, INFINITE); + } + else + status = WaitForSingleObject (impl->threadHandle, INFINITE); + QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED); + } +} + +unsigned long Thread::logId() { + return GetCurrentThreadId(); +} + +/* static */ +Thread Thread::current() { + ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex()); + Thread t; + if (tlsValue != NULL) { + // called from within Runnable->run(), so keepAlive has positive use count + t.impl = tlsValue->keepAlive; + } + else + t.impl.reset(new ThreadPrivate()); + return t; +} + +}} // namespace qpid::sys + + +#ifdef _DLL + +namespace qpid { +namespace sys { +namespace windows { + +extern bool processExiting; +extern bool libraryUnloading; + +}}} // namespace qpid::sys::SystemInfo + +// DllMain: called possibly many times in a process lifetime if dll +// loaded and freed repeatedly. Be mindful of Windows loader lock +// and other DllMain restrictions. + +BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) { + switch (reason) { + case DLL_PROCESS_ATTACH: + InitializeCriticalSection(&threadLock); + threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL); + break; + + case DLL_PROCESS_DETACH: + terminating = true; + if (reserved != NULL) { + // process exit(): threads are stopped arbitrarily and + // possibly in an inconsistent state. Not even threadLock + // can be trusted. All static destructors for this unit + // are pending and face the same unsafe environment. + // Any resources this unit knows about will be released as + // part of process tear down by the OS. Accordingly, skip + // any clean up tasks. + qpid::sys::windows::processExiting = true; + return TRUE; + } + else { + // FreeLibrary(): threads are still running and we are + // encouraged to clean up to avoid leaks. Mostly we just + // want any straggler threads to finish and notify + // threadsDone as the last thing they do. + qpid::sys::windows::libraryUnloading = true; + while (1) { + { + ScopedCriticalSection l(threadLock); + if (runningThreads == 0) + break; + ResetEvent(threadsDone); + } + WaitForSingleObject(threadsDone, INFINITE); + } + if (tlsIndex != TLS_OUT_OF_INDEXES) + TlsFree(getTlsIndex()); + CloseHandle(threadsDone); + DeleteCriticalSection(&threadLock); + } + break; + + case DLL_THREAD_ATTACH: + case DLL_THREAD_DETACH: + break; + } + return TRUE; +} + +#endif diff --git a/qpid/cpp/src/qpid/sys/windows/Time.cpp b/qpid/cpp/src/qpid/sys/windows/Time.cpp new file mode 100644 index 0000000000..4169ef1f0a --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Time.cpp @@ -0,0 +1,217 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include <cmath> +#include <ostream> +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/thread/thread_time.hpp> +#include <windows.h> +#include <time.h> + +using namespace boost::posix_time; + +namespace { + +// High-res timing support. This will display times since program start, +// more or less. Keep track of the start value and the conversion factor to +// seconds. +bool timeInitialized = false; +LARGE_INTEGER start_hpc; +double hpc_freq = 1.0; + +double start_time; + +/// Static constant to remove time skew between FILETIME and POSIX +/// time. POSIX and Win32 use different epochs (Jan. 1, 1970 v.s. +/// Jan. 1, 1601). The following constant defines the difference +/// in 100ns ticks. +const DWORDLONG FILETIME_to_timval_skew = 0x19db1ded53e8000; + +} + +namespace qpid { +namespace sys { + +AbsTime::AbsTime(const AbsTime& t, const Duration& d) { + if (d == Duration::max()) { + timepoint = ptime(max_date_time); + } + else { + time_duration td = microseconds(d.nanosecs / 1000); + timepoint = t.timepoint + td; + } +} + +AbsTime AbsTime::FarFuture() { + AbsTime ff; + ptime maxd(max_date_time); + ff.timepoint = maxd; + return ff; +} + +AbsTime AbsTime::Zero() { + AbsTime time_epoch; + time_epoch.timepoint = boost::posix_time::from_time_t(0); + return time_epoch; +} + +AbsTime AbsTime::epoch() { + AbsTime time_epoch; + time_epoch.timepoint = boost::posix_time::from_time_t(0); + return time_epoch; +} + +AbsTime AbsTime::now() { + AbsTime time_now; + time_now.timepoint = boost::get_system_time(); + return time_now; +} + +Duration Duration::FromEpoch() { + time_duration d = boost::get_system_time() - boost::posix_time::from_time_t(0); + return d.total_nanoseconds(); +} + +Duration::Duration(const AbsTime& start, const AbsTime& finish) { + time_duration d = finish.timepoint - start.timepoint; + nanosecs = d.total_nanoseconds(); +} + +std::ostream& operator<<(std::ostream& o, const Duration& d) { + if (d >= TIME_SEC) return o << (double(d)/TIME_SEC) << "s"; + if (d >= TIME_MSEC) return o << (double(d)/TIME_MSEC) << "ms"; + if (d >= TIME_USEC) return o << (double(d)/TIME_USEC) << "us"; + return o << int64_t(d) << "ns"; +} + +std::istream& operator>>(std::istream& i, Duration& d) { + // Don't throw, let the istream throw if it's configured to do so. + double number; + i >> number; + if (i.fail()) return i; + + if (i.eof() || std::isspace(i.peek())) // No suffix + d = number*TIME_SEC; + else { + std::string suffix; + i >> suffix; + if (i.fail()) return i; + if (suffix.compare("s") == 0) d = number*TIME_SEC; + else if (suffix.compare("ms") == 0) d = number*TIME_MSEC; + else if (suffix.compare("us") == 0) d = number*TIME_USEC; + else if (suffix.compare("ns") == 0) d = number*TIME_NSEC; + else i.setstate(std::ios::failbit); + } + return i; +} + +std::ostream& operator<<(std::ostream& o, const AbsTime& t) { + std::string time_string = to_simple_string(t.timepoint); + return o << time_string; +} + + +void sleep(int secs) { + ::Sleep(secs * 1000); +} + +void usleep(uint64_t usecs) { + DWORD msecs = usecs / 1000; + if (msecs == 0) + msecs = 1; + ::Sleep(msecs); +} + +void outputFormattedNow(std::ostream& o) { + ::time_t rawtime; + ::tm timeinfo; + char time_string[100]; + + ::time( &rawtime ); +#ifdef _MSC_VER + ::localtime_s(&timeinfo, &rawtime); +#else + timeinfo = *(::localtime(&rawtime)); +#endif + ::strftime(time_string, 100, + "%Y-%m-%d %H:%M:%S", + &timeinfo); + o << time_string << " "; +} + +void outputHiresNow(std::ostream& o) { + ::time_t tv_sec; + ::tm timeinfo; + char time_string[100]; + + if (!timeInitialized) { + // To start, get the current time from FILETIME which includes + // sub-second resolution. However, since FILETIME is updated a bit + // "bumpy" every 15 msec or so, future time displays will be the + // starting FILETIME plus a delta based on the high-resolution + // performance counter. + FILETIME file_time; + ULARGE_INTEGER start_usec; + ::GetSystemTimeAsFileTime(&file_time); // This is in 100ns units + start_usec.LowPart = file_time.dwLowDateTime; + start_usec.HighPart = file_time.dwHighDateTime; + start_usec.QuadPart -= FILETIME_to_timval_skew; + start_usec.QuadPart /= 10; // Convert 100ns to usec + tv_sec = (time_t)(start_usec.QuadPart / (1000 * 1000)); + long tv_usec = (long)(start_usec.QuadPart % (1000 * 1000)); + start_time = static_cast<double>(tv_sec); + start_time += tv_usec / 1000000.0; + + start_hpc.QuadPart = 0; + LARGE_INTEGER iFreq; + iFreq.QuadPart = 1; + QueryPerformanceCounter(&start_hpc); + QueryPerformanceFrequency(&iFreq); + hpc_freq = static_cast<double>(iFreq.QuadPart); + timeInitialized = true; + } + LARGE_INTEGER hpc_now; + hpc_now.QuadPart = 0; + QueryPerformanceCounter(&hpc_now); + hpc_now.QuadPart -= start_hpc.QuadPart; + if (hpc_now.QuadPart < 0) + hpc_now.QuadPart = 0; + double now = static_cast<double>(hpc_now.QuadPart); + now /= hpc_freq; // now is seconds after this + double fnow = start_time + now; + double usec, sec; + usec = modf(fnow, &sec); + tv_sec = static_cast<time_t>(sec); +#ifdef _MSC_VER + ::localtime_s(&timeinfo, &tv_sec); +#else + timeinfo = *(::localtime(&tv_sec)); +#endif + ::strftime(time_string, 100, + "%Y-%m-%d %H:%M:%S", + &timeinfo); + // No way to set "max field width" to cleanly output the double usec so + // convert it back to integral number of usecs and print that. + unsigned long i_usec = usec * 1000 * 1000; + o << time_string << "." << std::setw(6) << std::setfill('0') << i_usec << " "; +} +}} diff --git a/qpid/cpp/src/qpid/sys/windows/Time.h b/qpid/cpp/src/qpid/sys/windows/Time.h new file mode 100644 index 0000000000..2987b1c8b2 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/Time.h @@ -0,0 +1,36 @@ +#ifndef QPID_SYS_WINDOWS_TIME_H +#define QPID_SYS_WINDOWS_TIME_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/date_time/posix_time/posix_time_types.hpp> + +namespace qpid { +namespace sys { + +/** + * Class to represent an instant in time. Boost has this stuff already done + * so just reuse it. We can also grab this for quick use with the Condition + * wait operations. + */ +typedef boost::posix_time::ptime TimePrivate; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_WINDOWS_TIME_H*/ diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp new file mode 100644 index 0000000000..5637f6a9fb --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp @@ -0,0 +1,276 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/windows/WinSocket.h" + +#include "qpid/sys/SocketAddress.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/IoHandlePrivate.h" +#include "qpid/sys/SystemInfo.h" + +namespace qpid { +namespace sys { + +// Need to initialize WinSock. Ideally, this would be a singleton or embedded +// in some one-time initialization function. I tried boost singleton and could +// not get it to compile (and others located in google had the same problem). +// So, this simple static with an interlocked increment will do for known +// use cases at this time. Since this will only shut down winsock at process +// termination, there may be some problems with client programs that also +// expect to load and unload winsock, but we'll see... +// If someone does get an easy-to-use singleton sometime, converting to it +// may be preferable. + +namespace { + +static LONG volatile initialized = 0; + +class WinSockSetup { + // : public boost::details::pool::singleton_default<WinSockSetup> { + +public: + WinSockSetup() { + LONG timesEntered = InterlockedIncrement(&initialized); + if (timesEntered > 1) + return; + err = 0; + WORD wVersionRequested; + WSADATA wsaData; + + /* Request WinSock 2.2 */ + wVersionRequested = MAKEWORD(2, 2); + err = WSAStartup(wVersionRequested, &wsaData); + } + + ~WinSockSetup() { + if (SystemInfo::threadSafeShutdown()) + WSACleanup(); + } + +public: + int error(void) const { return err; } + +protected: + DWORD err; +}; + +static WinSockSetup setup; + +std::string getName(SOCKET fd, bool local) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + if (local) { + QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); + } else { + QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen)); + } + + return SocketAddress::asString(name, namelen); +} + +uint16_t getLocalPort(int fd) +{ + ::sockaddr_storage name_s; // big enough for any socket address + ::sockaddr* name = (::sockaddr*)&name_s; + ::socklen_t namelen = sizeof(name_s); + + QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen)); + + return SocketAddress::getPort(name); +} +} // namespace + +WinSocket::WinSocket() : + handle(new IOHandle), + nonblocking(false), + nodelay(false) +{} + +Socket* createSocket() +{ + return new WinSocket; +} + +WinSocket::WinSocket(SOCKET fd) : + handle(new IOHandle(fd)), + nonblocking(false), + nodelay(false) +{} + +WinSocket::operator const IOHandle&() const +{ + return *handle; +} + +void WinSocket::createSocket(const SocketAddress& sa) const +{ + SOCKET& socket = handle->fd; + if (socket != INVALID_SOCKET) WinSocket::close(); + + SOCKET s = ::socket (getAddrInfo(sa).ai_family, + getAddrInfo(sa).ai_socktype, + 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; + + try { + if (nonblocking) setNonblocking(); + if (nodelay) setTcpNoDelay(); + } catch (std::exception&) { + ::closesocket(s); + socket = INVALID_SOCKET; + throw; + } +} + +void WinSocket::setNonblocking() const { + u_long nonblock = 1; + QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock)); +} + +void +WinSocket::connect(const SocketAddress& addr) const +{ + peername = addr.asString(false); + + createSocket(addr); + + const SOCKET& socket = handle->fd; + int err; + WSASetLastError(0); + if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) && + ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK)) + throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername)); +} + +void +WinSocket::finishConnect(const SocketAddress&) const +{ +} + +void +WinSocket::close() const +{ + SOCKET& socket = handle->fd; + if (socket == INVALID_SOCKET) return; + QPID_WINSOCK_CHECK(closesocket(socket)); + socket = INVALID_SOCKET; +} + + +int WinSocket::write(const void *buf, size_t count) const +{ + const SOCKET& socket = handle->fd; + int sent = ::send(socket, (const char *)buf, count, 0); + if (sent == SOCKET_ERROR) + return -1; + return sent; +} + +int WinSocket::read(void *buf, size_t count) const +{ + const SOCKET& socket = handle->fd; + int received = ::recv(socket, (char *)buf, count, 0); + if (received == SOCKET_ERROR) + return -1; + return received; +} + +int WinSocket::listen(const SocketAddress& addr, int backlog) const +{ + createSocket(addr); + + const SOCKET& socket = handle->fd; + BOOL yes=1; + QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *)&yes, sizeof(yes))); + + if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError()))); + if (::listen(socket, backlog) == SOCKET_ERROR) + throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError()))); + + return getLocalPort(socket); +} + +Socket* WinSocket::accept() const +{ + SOCKET afd = ::accept(handle->fd, 0, 0); + if (afd != INVALID_SOCKET) + return new WinSocket(afd); + else if (WSAGetLastError() == EAGAIN) + return 0; + else throw QPID_WINDOWS_ERROR(WSAGetLastError()); +} + +std::string WinSocket::getPeerAddress() const +{ + if (peername.empty()) { + peername = getName(handle->fd, false); + } + return peername; +} + +std::string WinSocket::getLocalAddress() const +{ + if (localname.empty()) { + localname = getName(handle->fd, true); + } + return localname; +} + +int WinSocket::getError() const +{ + int result; + socklen_t rSize = sizeof (result); + + QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize)); + return result; +} + +void WinSocket::setTcpNoDelay() const +{ + SOCKET& socket = handle->fd; + nodelay = true; + if (socket != INVALID_SOCKET) { + int flag = 1; + int result = setsockopt(handle->fd, + IPPROTO_TCP, + TCP_NODELAY, + (char *)&flag, + sizeof(flag)); + QPID_WINSOCK_CHECK(result); + } +} + +int WinSocket::getKeyLen() const +{ + return 0; +} + +std::string WinSocket::getClientAuthId() const +{ + return std::string(); +} + +}} // namespace qpid::sys diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.h b/qpid/cpp/src/qpid/sys/windows/WinSocket.h new file mode 100644 index 0000000000..bee6a58e7a --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.h @@ -0,0 +1,118 @@ +#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H +#define QPID_SYS_WINDOWS_BSDSOCKET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +#include <boost/scoped_ptr.hpp> + +// Ensure we get all of winsock2.h +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif + +#include <winsock2.h> + +namespace qpid { +namespace sys { + +namespace windows { +Socket* createSameTypeSocket(const Socket&); +} + +class Duration; +class IOHandle; +class SocketAddress; + +class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket +{ +public: + /** Create a socket wrapper for descriptor. */ + QPID_COMMON_EXTERN WinSocket(); + + QPID_COMMON_EXTERN operator const IOHandle&() const; + + /** Set socket non blocking */ + QPID_COMMON_EXTERN virtual void setNonblocking() const; + + QPID_COMMON_EXTERN virtual void setTcpNoDelay() const; + + QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const; + QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const; + + QPID_COMMON_EXTERN virtual void close() const; + + /** Bind to a port and start listening. + *@return The bound port number + */ + QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const; + + /** + * Returns an address (host and port) for the remote end of the + * socket + */ + QPID_COMMON_EXTERN std::string getPeerAddress() const; + /** + * Returns an address (host and port) for the local end of the + * socket + */ + QPID_COMMON_EXTERN std::string getLocalAddress() const; + + /** + * Returns the error code stored in the socket. This may be used + * to determine the result of a non-blocking connect. + */ + QPID_COMMON_EXTERN int getError() const; + + /** Accept a connection from a socket that is already listening + * and has an incoming connection + */ + QPID_COMMON_EXTERN virtual Socket* accept() const; + + // TODO The following are raw operations, maybe they need better wrapping? + QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const; + QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const; + + QPID_COMMON_EXTERN int getKeyLen() const; + QPID_COMMON_EXTERN std::string getClientAuthId() const; + +protected: + /** Create socket */ + void createSocket(const SocketAddress&) const; + + mutable boost::scoped_ptr<IOHandle> handle; + mutable std::string localname; + mutable std::string peername; + mutable bool nonblocking; + mutable bool nodelay; + + /** Construct socket with existing handle */ + friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&); + WinSocket(SOCKET fd); +}; + +}} +#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/ diff --git a/qpid/cpp/src/qpid/sys/windows/check.h b/qpid/cpp/src/qpid/sys/windows/check.h new file mode 100755 index 0000000000..2a8e439bed --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/check.h @@ -0,0 +1,49 @@ +#ifndef _windows_check_h +#define _windows_check_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include "qpid/sys/StrError.h" + +#define QPID_WINDOWS_ERROR(ERRVAL) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRVAL))) +#define QPID_WINDOWS_CRT_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRNO))) + +/** THROW QPID_WINDOWS_ERROR(::GetLastError()) if RESULT is NULL */ +#define QPID_WINDOWS_CHECK_NULL(RESULT) \ + if ((RESULT) == NULL) throw QPID_WINDOWS_ERROR((::GetLastError())) + +#define QPID_WINDOWS_CHECK_NOT(RESULT,VAL) \ + if ((RESULT) == (VAL)) throw QPID_WINDOWS_ERROR((::GetLastError())) + +#define QPID_WINDOWS_CHECK_ASYNC_START(STATUS) \ + if (!(STATUS) && ::WSAGetLastError() != ERROR_IO_PENDING) \ + throw QPID_WINDOWS_ERROR((::WSAGetLastError())) + +#define QPID_WINDOWS_CHECK_CRT_NZ(VAL) \ + if ((VAL) == 0) throw QPID_WINDOWS_CRT_ERROR(errno) + +#define QPID_WINSOCK_CHECK(OP) \ + if ((OP) == SOCKET_ERROR) throw QPID_WINDOWS_ERROR((::WSAGetLastError())) + +#endif /*!_windows_check_h*/ diff --git a/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h new file mode 100644 index 0000000000..51f613cc25 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h @@ -0,0 +1,39 @@ +#ifndef _sys_windows_mingw32_compat +#define _sys_windows_mingw32_compat +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef WIN32 +#ifndef _MSC_VER + +// +// The following definitions for extension function GUIDs and signatures are taken from +// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of +// mswsock.h, but are not included presently. +// + +#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} +typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED); + +#endif +#endif + +#endif diff --git a/qpid/cpp/src/qpid/sys/windows/util.cpp b/qpid/cpp/src/qpid/sys/windows/util.cpp new file mode 100644 index 0000000000..75aef26c35 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/util.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/windows/util.h" +#include "qpid/Exception.h" +#include "qpid/sys/SystemInfo.h" + +#include <iostream> +#include <fstream> + +namespace qpid { +namespace sys { +namespace ssl { + +static const std::string LOCALHOST("127.0.0.1"); + +std::string defaultCertName() +{ + Address address; + if (SystemInfo::getLocalHostname(address)) { + return address.host; + } else { + return LOCALHOST; + } +} + +SslOptions::SslOptions() : qpid::Options("SSL Settings"), + certName(defaultCertName()) +{ + addOptions() + ("ssl-cert-password-file", optValue(certPasswordFile, "PATH"), "File containing password to use for accessing certificates") + ("ssl-cert-store", optValue(certStore, "NAME"), "Windows certificate store containing the certificate") + ("ssl-cert-Filename", optValue(certFilename, "PATH"), "Path to PKCS#12 file containing the certificate") + ("ssl-cert-name", optValue(certName, "NAME"), "Friendly Name of the certificate to use"); +} + +SslOptions& SslOptions::operator=(const SslOptions& o) +{ + certStore = o.certStore; + certName = o.certName; + certPasswordFile = o.certPasswordFile; + certFilename = o.certFilename; + + return *this; +} + +SslOptions SslOptions::global; + +void initWinSsl(const SslOptions& options, bool) +{ + SslOptions::global = options; +} +}}} // namespace qpid::sys::ssl diff --git a/qpid/cpp/src/qpid/sys/windows/util.h b/qpid/cpp/src/qpid/sys/windows/util.h new file mode 100644 index 0000000000..2855a90955 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/util.h @@ -0,0 +1,50 @@ +#ifndef QPID_SYS_SSL_UTIL_H +#define QPID_SYS_SSL_UTIL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonImportExport.h" +#include "qpid/Options.h" +#include <string> + +namespace qpid { +namespace sys { +namespace ssl { + +struct SslOptions : qpid::Options +{ + QPID_COMMON_EXTERN static SslOptions global; + + std::string certStore; + std::string certName; + std::string certPasswordFile; + std::string certFilename; + + QPID_COMMON_EXTERN SslOptions(); + QPID_COMMON_EXTERN SslOptions& operator=(const SslOptions&); +}; + +QPID_COMMON_EXTERN void initWinSsl(const SslOptions& options, bool server = false); + +}}} // namespace qpid::sys::ssl + +#endif /*!QPID_SYS_SSL_UTIL_H*/ diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.cpp b/qpid/cpp/src/qpid/sys/windows/uuid.cpp new file mode 100644 index 0000000000..4ff75ca627 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/uuid.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * UUIDs and GUIDs (both RFC 4122) differ on byte positions of the + * internal representation. This matters when encoding to the wire + * and adhering to versioning info. Microsoft APIs used here operate + * on GUIDs even if the name implies UUIDs. AMQP expects the UUID 128 + * bit format which is used here unless otherwise noted. + */ + +#include <rpc.h> +#ifdef uuid_t /* Done in rpcdce.h */ +# undef uuid_t +#endif + +#include "qpid/sys/uuid.h" + +#include <string.h> + +namespace { +inline void iswap (char *p1, char *p2) { + char t = *p1; + *p1 = *p2; + *p2 = t; +} + +void toUuid (const UUID *guid, uint8_t uuid[qpid::sys::UuidSize]) { + // copy then swap bytes + memcpy ((char *) uuid, (char *) guid, qpid::sys::UuidSize); + char *p = (char *) uuid; + iswap (p, p+3); + iswap (p+1, p+2); + iswap (p+4, p+5); + iswap (p+6, p+7); +} + +} // namespace + +namespace qpid { +namespace sys { + +void uuid_generate (uint8_t out[qpid::sys::UuidSize]) { + UUID guid; + UuidCreate (&guid); + // Version 4 GUID, convert to UUID + toUuid (&guid, out); +} + +}} |