summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/windows
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows')
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp713
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.h235
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/AsynchIoResult.h204
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Condition.h77
-rw-r--r--qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp90
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IOHandle.cpp29
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h58
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IocpPoller.cpp220
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/LockFile.cpp64
-rw-r--r--qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp58
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Mutex.h188
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Path.cpp65
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/PipeHandle.cpp101
-rw-r--r--qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp114
-rw-r--r--qpid/cpp/src/qpid/sys/windows/QpidDllMain.h72
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Shlib.cpp54
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp346
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp735
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h192
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslCredential.cpp279
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslCredential.h84
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/StrError.cpp52
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/SystemInfo.cpp208
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Thread.cpp340
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Time.cpp217
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Time.h36
-rw-r--r--qpid/cpp/src/qpid/sys/windows/WinSocket.cpp276
-rw-r--r--qpid/cpp/src/qpid/sys/windows/WinSocket.h118
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/check.h49
-rw-r--r--qpid/cpp/src/qpid/sys/windows/mingw32_compat.h39
-rw-r--r--qpid/cpp/src/qpid/sys/windows/util.cpp70
-rw-r--r--qpid/cpp/src/qpid/sys/windows/util.h50
-rw-r--r--qpid/cpp/src/qpid/sys/windows/uuid.cpp68
33 files changed, 5501 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
new file mode 100644
index 0000000000..d65aad1304
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -0,0 +1,713 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/windows/WinSocket.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
+
+#include <boost/thread/once.hpp>
+
+#include <queue>
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+#include <boost/bind.hpp>
+#include <boost/shared_array.hpp>
+#include "qpid/sys/windows/AsynchIO.h"
+
+namespace {
+
+ typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock;
+
+/*
+ * The function pointers for AcceptEx and ConnectEx need to be looked up
+ * at run time.
+ */
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::IOHandle& io) {
+ SOCKET h = io.fd;
+ GUID guidAcceptEx = WSAID_ACCEPTEX;
+ DWORD dwBytes = 0;
+ LPFN_ACCEPTEX fnAcceptEx;
+ WSAIoctl(h,
+ SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guidAcceptEx,
+ sizeof(guidAcceptEx),
+ &fnAcceptEx,
+ sizeof(fnAcceptEx),
+ &dwBytes,
+ NULL,
+ NULL);
+ if (fnAcceptEx == 0)
+ throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+ return fnAcceptEx;
+}
+
+}
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * Asynch Acceptor
+ *
+ */
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
+ : acceptedCallback(callback),
+ socket(s),
+ wSocket(IOHandle(s).fd),
+ fnAcceptEx(lookUpAcceptEx(s)) {
+
+ s.setNonblocking();
+}
+
+AsynchAcceptor::~AsynchAcceptor()
+{
+ socket.close();
+}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
+ restart ();
+}
+
+void AsynchAcceptor::restart(void) {
+ DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
+ AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
+ this,
+ socket);
+ BOOL status;
+ status = fnAcceptEx(wSocket,
+ IOHandle(*result->newSocket).fd,
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
+ QPID_WINDOWS_CHECK_ASYNC_START(status);
+}
+
+
+Socket* createSameTypeSocket(const Socket& sock) {
+ SOCKET socket = IOHandle(sock).fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new WinSocket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new WinSocket(s);
+}
+
+AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
+ const Socket& lsocket)
+ : callback(cb), acceptor(acceptor),
+ listener(IOHandle(lsocket).fd),
+ newSocket(createSameTypeSocket(lsocket)) {
+}
+
+void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
+ ::setsockopt (IOHandle(*newSocket).fd,
+ SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&listener,
+ sizeof (listener));
+ callback(*(newSocket.release()));
+ acceptor->restart ();
+ delete this;
+}
+
+void AsynchAcceptResult::failure(int /*status*/) {
+ //if (status != WSA_OPERATION_ABORTED)
+ // Can there be anything else? ;
+ delete this;
+}
+
+/*
+ * AsynchConnector does synchronous connects for now... to do asynch the
+ * IocpPoller will need some extension to register an event handle as a
+ * CONNECT-type "direction", the connect completion/result will need an
+ * event handle to associate with the connecting handle. But there's no
+ * time for that right now...
+ */
+AsynchConnector::AsynchConnector(const Socket& sock,
+ const std::string& hname,
+ const std::string& p,
+ ConnectedCallback connCb,
+ FailedCallback failCb) :
+ connCallback(connCb), failCallback(failCb), socket(sock),
+ hostname(hname), port(p)
+{
+}
+
+void AsynchConnector::start(Poller::shared_ptr)
+{
+ try {
+ socket.connect(SocketAddress(hostname, port));
+ socket.setNonblocking();
+ connCallback(socket);
+ } catch(std::exception& e) {
+ if (failCallback)
+ failCallback(socket, -1, std::string(e.what()));
+ socket.close();
+ }
+}
+
+// This can never be called in the current windows code as connect
+// is blocking and requestCallback only makes sense if connect is
+// non-blocking with the results returned via a poller callback.
+void AsynchConnector::requestCallback(RequestCallback rCb)
+{
+}
+
+} // namespace windows
+
+AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
+ Callback callback)
+{
+ return new windows::AsynchAcceptor(s, callback);
+}
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new windows::AsynchConnector(s,
+ hostname,
+ port,
+ connCb,
+ failCb);
+}
+
+
+/*
+ * Asynch reader/writer
+ */
+
+namespace windows {
+
+// This is used to encapsulate pure callbacks into a handle
+class CallbackHandle : public IOHandle {
+public:
+ CallbackHandle(AsynchIoResult::Completer completeCb,
+ AsynchIO::RequestCallback reqCb = 0) :
+ IOHandle(INVALID_SOCKET, completeCb, reqCb)
+ {}
+};
+
+AsynchIO::AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb) :
+
+ readCallback(rCb),
+ eofCallback(eofCb),
+ disCallback(disCb),
+ closedCallback(cCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ socket(s),
+ bufferCount(BufferCount),
+ opsInProgress(0),
+ writeInProgress(false),
+ readInProgress(false),
+ queuedDelete(false),
+ queuedClose(false),
+ working(false) {
+}
+
+AsynchIO::~AsynchIO() {
+}
+
+void AsynchIO::queueForDeletion() {
+ {
+ ScopedLock<Mutex> l(completionLock);
+ assert(!queuedDelete);
+ queuedDelete = true;
+ if (working || opsInProgress > 0) {
+ QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+ // AsynchIOHandler calls this then deletes itself; don't do any more
+ // callbacks.
+ readCallback = 0;
+ eofCallback = 0;
+ disCallback = 0;
+ closedCallback = 0;
+ emptyCallback = 0;
+ idleCallback = 0;
+ return;
+ }
+ }
+ delete this;
+}
+
+void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
+ poller = poller0;
+ poller->monitorHandle(ph, Poller::INPUT);
+ if (writeQueue.size() > 0) // Already have data queued for write
+ notifyPendingWrite();
+ startReading();
+}
+
+uint32_t AsynchIO::getBufferCount(void) { return bufferCount; }
+
+void AsynchIO::setBufferCount(uint32_t count) { bufferCount = count; }
+
+
+void AsynchIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*bufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(bufferCount);
+ for (uint32_t i = 0; i < bufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
+void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ QLock l(bufferQueueLock);
+ bufferQueue.push_back(buff);
+}
+
+void AsynchIO::unread(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ buff->squish();
+ QLock l(bufferQueueLock);
+ bufferQueue.push_front(buff);
+}
+
+void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ QLock l(bufferQueueLock);
+ writeQueue.push_back(buff);
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ PollerHandle ph(CallbackHandle(boost::bind(&AsynchIO::completion, this, _1)));
+ poller->monitorHandle(ph, Poller::OUTPUT);
+}
+
+void AsynchIO::queueWriteClose() {
+ {
+ ScopedLock<Mutex> l(completionLock);
+ queuedClose = true;
+ if (working || writeInProgress)
+ // no need to summon an IO thread
+ return;
+ }
+ notifyPendingWrite();
+}
+
+bool AsynchIO::writeQueueEmpty() {
+ QLock l(bufferQueueLock);
+ return writeQueue.size() == 0;
+}
+
+/*
+ * Initiate a read operation. AsynchIO::readComplete() will be
+ * called when the read is complete and data is available.
+ */
+void AsynchIO::startReading() {
+ if (queuedDelete || queuedClose)
+ return;
+
+ // (Try to) get a buffer; look on the front since there may be an
+ // "unread" one there with data remaining from last time.
+ AsynchIO::BufferBase *buff = 0;
+ {
+ QLock l(bufferQueueLock);
+
+ if (!bufferQueue.empty()) {
+ buff = bufferQueue.front();
+ assert(buff);
+ bufferQueue.pop_front();
+ }
+ else {
+ logNoBuffers("startReading");
+ }
+ }
+ if (buff != 0) {
+ int readCount = buff->byteCount - buff->dataCount;
+ AsynchReadResult *result =
+ new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ readCount);
+ DWORD bytesReceived = 0, flags = 0;
+ InterlockedIncrement(&opsInProgress);
+ readInProgress = true;
+ int status = WSARecv(IOHandle(socket).fd,
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesReceived,
+ &flags,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error);
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ }
+ else {
+ notifyBuffersEmpty();
+ }
+ return;
+}
+
+// Queue the specified callback for invocation from an I/O thread.
+void AsynchIO::requestCallback(RequestCallback callback) {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ PollerHandle ph(CallbackHandle(
+ boost::bind(&AsynchIO::completion, this, _1),
+ callback));
+ poller->monitorHandle(ph, Poller::INPUT);
+}
+
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+ QLock l(bufferQueueLock);
+ BufferBase* buff = bufferQueue.empty() ? 0 : bufferQueue.back();
+ // An "unread" buffer is reserved for future read operations (which
+ // take from the front of the queue).
+ if (!buff || (buff->dataCount && bufferQueue.size() == 1)) {
+ if (buff)
+ logNoBuffers("getQueuedBuffer with unread data");
+ else
+ logNoBuffers("getQueuedBuffer with empty queue");
+ return 0;
+ }
+ assert(buff->dataCount == 0);
+ bufferQueue.pop_back();
+ return buff;
+}
+
+void AsynchIO::notifyEof(void) {
+ if (eofCallback)
+ eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+ if (disCallback) {
+ DisconnectCallback dcb = disCallback;
+ closedCallback = 0;
+ disCallback = 0;
+ dcb(*this);
+ // May have just been deleted.
+ return;
+ }
+}
+
+void AsynchIO::notifyClosed(void) {
+ if (closedCallback) {
+ ClosedCallback ccb = closedCallback;
+ closedCallback = 0;
+ disCallback = 0;
+ ccb(*this, socket);
+ // May have just been deleted.
+ return;
+ }
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+ if (emptyCallback)
+ emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+ if (idleCallback)
+ idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
+void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
+ writeInProgress = true;
+ InterlockedIncrement(&opsInProgress);
+ AsynchWriteResult *result =
+ new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ buff->dataCount);
+ DWORD bytesSent = 0;
+ int status = WSASend(IOHandle(socket).fd,
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesSent,
+ 0,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error); // Also decrements in-progress count
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ return;
+}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(void) {
+ socket.close();
+ notifyClosed();
+}
+
+SecuritySettings AsynchIO::getSecuritySettings() {
+ SecuritySettings settings;
+ settings.ssf = socket.getKeyLen();
+ settings.authid = socket.getClientAuthId();
+ return settings;
+}
+
+void AsynchIO::readComplete(AsynchReadResult *result) {
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ readInProgress = false;
+ if (status == 0 && bytes > 0) {
+ if (readCallback)
+ readCallback(*this, result->getBuff());
+ startReading();
+ }
+ else {
+ // No data read, so put the buffer back. It may be partially filled,
+ // so "unread" it back to the front of the queue.
+ unread(result->getBuff());
+ if (queuedClose) {
+ return; // Expected from cancelRead()
+ }
+ notifyEof();
+ if (status != 0)
+ {
+ notifyDisconnect();
+ }
+ }
+}
+
+/*
+ * NOTE - this completion is called for completed writes and also when
+ * a write is desired. The difference is in the buff - if a write is desired
+ * the buff is 0.
+ */
+void AsynchIO::writeComplete(AsynchWriteResult *result) {
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ AsynchIO::BufferBase *buff = result->getBuff();
+ if (buff != 0) {
+ writeInProgress = false;
+ if (status == 0 && bytes > 0) {
+ if (bytes < result->getRequested()) // Still more to go; resubmit
+ startWrite(buff);
+ else
+ queueReadBuffer(buff); // All done; back to the pool
+ }
+ else {
+ // An error... if it's a connection close, ignore it - it will be
+ // noticed and handled on a read completion any moment now.
+ // What to do with real error??? Save the Buffer? TBD.
+ queueReadBuffer(buff); // All done; back to the pool
+ }
+ }
+
+ // If there are no writes outstanding, check for more writes to initiate
+ // (either queued or via idle). The opsInProgress count is handled in
+ // completion()
+ if (!writeInProgress) {
+ bool writing = false;
+ {
+ QLock l(bufferQueueLock);
+ if (writeQueue.size() > 0) {
+ buff = writeQueue.front();
+ assert(buff);
+ writeQueue.pop_front();
+ startWrite(buff);
+ writing = true;
+ }
+ }
+ if (!writing && !queuedClose) {
+ notifyIdle();
+ }
+ }
+ return;
+}
+
+void AsynchIO::completion(AsynchIoResult *result) {
+ bool closing = false;
+ bool deleting = false;
+ {
+ ScopedLock<Mutex> l(completionLock);
+ if (working) {
+ completionQueue.push(result);
+ return;
+ }
+
+ // First thread in with something to do; note we're working then keep
+ // handling completions.
+ working = true;
+ while (result != 0) {
+ // New scope to unlock temporarily.
+ {
+ ScopedUnlock<Mutex> ul(completionLock);
+ AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
+ if (r != 0)
+ readComplete(r);
+ else {
+ AsynchWriteResult *w =
+ dynamic_cast<AsynchWriteResult*>(result);
+ if (w != 0)
+ writeComplete(w);
+ else {
+ AsynchCallbackRequest *req =
+ dynamic_cast<AsynchCallbackRequest*>(result);
+ req->reqCallback(*this);
+ }
+ }
+ delete result;
+ result = 0;
+ InterlockedDecrement(&opsInProgress);
+ if (queuedClose && opsInProgress == 1 && readInProgress)
+ cancelRead();
+ }
+ // Lock is held again.
+ if (completionQueue.empty())
+ continue;
+ result = completionQueue.front();
+ completionQueue.pop();
+ }
+ working = false;
+ if (opsInProgress == 0) {
+ closing = queuedClose;
+ deleting = queuedDelete;
+ }
+ }
+ // Lock released; ok to close if ops are done and close requested.
+ // Layer above will call back to queueForDeletion() if it hasn't
+ // already been done. If it already has, go ahead and delete.
+ if (deleting)
+ delete this;
+ else if (closing)
+ // close() may cause a delete; don't trust 'this' on return
+ close();
+}
+
+/*
+ * NOTE - this method must be called in the same context as other completions,
+ * so that the resulting readComplete, and final AsynchIO::close() is serialized
+ * after this method returns.
+ */
+void AsynchIO::cancelRead() {
+ if (queuedDelete)
+ return; // socket already deleted
+ else {
+ ScopedLock<Mutex> l(completionLock);;
+ if (!completionQueue.empty())
+ return; // process it; come back later if necessary
+ }
+ // Cancel outstanding read and force to completion. Otherwise, on a faulty
+ // physical link, the pending read can remain uncompleted indefinitely.
+ // Draining the pending read will result in the official close (and
+ // notifyClosed). CancelIoEX() is the natural choice, but not available in
+ // XP, so we make do with closesocket().
+ socket.close();
+}
+
+/*
+ * Track down cause of unavailable buffer if it recurs: QPID-5033
+ */
+void AsynchIO::logNoBuffers(const char *context) {
+ QPID_LOG(error, "No IO buffers available: " << context <<
+ ". Debug data: " << bufferQueue.size() <<
+ ' ' << writeQueue.size() <<
+ ' ' << completionQueue.size() <<
+ ' ' << opsInProgress <<
+ ' ' << writeInProgress <<
+ ' ' << readInProgress <<
+ ' ' << working);
+}
+
+
+} // namespace windows
+
+AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.h b/qpid/cpp/src/qpid/sys/windows/AsynchIO.h
new file mode 100644
index 0000000000..a50864b561
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.h
@@ -0,0 +1,235 @@
+#ifndef _sys_windows_AsynchIO
+#define _sys_windows_AsynchIO
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/CommonImportExport.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/shared_array.hpp>
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * Asynch Acceptor
+ */
+
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
+
+ friend class AsynchAcceptResult;
+
+public:
+ AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptor();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void restart(void);
+
+ AsynchAcceptor::Callback acceptedCallback;
+ const Socket& socket;
+ const SOCKET wSocket;
+ const LPFN_ACCEPTEX fnAcceptEx;
+};
+
+
+class AsynchConnector : public qpid::sys::AsynchConnector {
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+ const std::string hostname;
+ const std::string port;
+
+public:
+ AsynchConnector(const Socket& socket,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+ void start(Poller::shared_ptr poller);
+ void requestCallback(RequestCallback rCb);
+};
+
+class AsynchIO : public qpid::sys::AsynchIO {
+
+ friend class SslAsynchIO;
+
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+ ~AsynchIO();
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ /**
+ * Notify the object is should delete itself as soon as possible.
+ */
+ virtual void queueForDeletion();
+
+ /// Take any actions needed to prepare for working with the poller.
+ virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void requestCallback(RequestCallback);
+
+ /**
+ * getQueuedBuffer returns a buffer from the buffer queue, if one is
+ * available.
+ *
+ * @retval Pointer to BufferBase buffer; 0 if none is available.
+ */
+ virtual BufferBase* getQueuedBuffer();
+
+ virtual SecuritySettings getSecuritySettings(void);
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ Poller::shared_ptr poller;
+ uint32_t bufferCount;
+
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
+ * access to the buffer queue and write queue.
+ */
+ Mutex bufferQueueLock;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
+
+ // Number of outstanding I/O operations.
+ volatile LONG opsInProgress;
+ // Is there a write in progress?
+ volatile bool writeInProgress;
+ // Or a read?
+ volatile bool readInProgress;
+ // Deletion requested, but there are callbacks in progress.
+ volatile bool queuedDelete;
+ // Socket close requested, but there are operations in progress.
+ volatile bool queuedClose;
+
+protected:
+ uint32_t getBufferCount(void);
+ void setBufferCount(uint32_t);
+
+private:
+ // Dispatch events that have completed.
+ void notifyEof(void);
+ void notifyDisconnect(void);
+ void notifyClosed(void);
+ void notifyBuffersEmpty(void);
+ void notifyIdle(void);
+
+ /**
+ * Initiate a write of the specified buffer. There's no callback for
+ * write completion to the AsynchIO object.
+ */
+ void startWrite(AsynchIO::BufferBase* buff);
+
+ void close(void);
+
+ /**
+ * startReading initiates reading, readComplete() is
+ * called when the read completes.
+ */
+ void startReading();
+
+ /**
+ * readComplete is called when a read request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void readComplete(AsynchReadResult *result);
+
+ /**
+ * writeComplete is called when a write request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void writeComplete(AsynchWriteResult *result);
+
+ /**
+ * Queue of completions to run. This queue enforces the requirement
+ * from upper layers that only one thread at a time is allowed to act
+ * on any given connection. Once a thread is busy processing a completion
+ * on this object, other threads that dispatch completions queue the
+ * completions here for the in-progress thread to handle when done.
+ * Thus, any threads can dispatch a completion from the IocpPoller, but
+ * this class ensures that actual processing at the connection level is
+ * only on one thread at a time.
+ */
+ std::queue<AsynchIoResult *> completionQueue;
+ volatile bool working;
+ Mutex completionLock;
+
+ /**
+ * Called when there's a completion to process.
+ */
+ void completion(AsynchIoResult *result);
+
+ /**
+ * Helper function to facilitate the close operation
+ */
+ void cancelRead();
+
+ /**
+ * Log information about buffer depletion, which should never happen.
+ * See QPID-5033.
+ */
+ void logNoBuffers(const char*);
+};
+
+}}} // namespace qpid::sys::windows
+
+#endif // _sys_windows_AsynchIO
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
new file mode 100755
index 0000000000..27e4c22138
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -0,0 +1,204 @@
+#ifndef _windows_asynchIoResult_h
+#define _windows_asynchIoResult_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include <memory.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * AsynchIoResult defines the class that receives the result of an
+ * asynchronous I/O operation, either send/recv or accept/connect.
+ *
+ * Operation factories should set one of these up before beginning the
+ * operation. Poller knows how to dispatch completion to this class.
+ * This class must be subclassed for needed operations; this class provides
+ * an interface only and cannot be instantiated.
+ *
+ * This class is tied to Windows; it inherits from OVERLAPPED so that the
+ * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call
+ * the completion handler.
+ */
+class AsynchResult : private OVERLAPPED {
+public:
+ LPOVERLAPPED overlapped(void) { return this; }
+ static AsynchResult* from_overlapped(LPOVERLAPPED ol) {
+ return static_cast<AsynchResult*>(ol);
+ }
+ virtual void success (size_t bytesTransferred) {
+ bytes = bytesTransferred;
+ status = 0;
+ complete();
+ }
+ virtual void failure (int error) {
+ bytes = 0;
+ status = error;
+ complete();
+ }
+ size_t getTransferred(void) const { return bytes; }
+ int getStatus(void) const { return status; }
+
+protected:
+ AsynchResult() : bytes(0), status(0)
+ { memset(overlapped(), 0, sizeof(OVERLAPPED)); }
+ ~AsynchResult() {}
+ virtual void complete(void) = 0;
+
+ size_t bytes;
+ int status;
+};
+
+class AsynchAcceptor;
+
+class AsynchAcceptResult : public AsynchResult {
+
+ friend class AsynchAcceptor;
+
+public:
+ AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
+ const qpid::sys::Socket& listener);
+ virtual void success (size_t bytesTransferred);
+ virtual void failure (int error);
+
+private:
+ virtual void complete(void) {} // No-op for this class.
+
+ qpid::sys::AsynchAcceptor::Callback callback;
+ AsynchAcceptor *acceptor;
+ SOCKET listener;
+ std::auto_ptr<qpid::sys::Socket> newSocket;
+
+ // AcceptEx needs a place to write the local and remote addresses
+ // when accepting the connection. Place those here; get enough for
+ // IPv6 addresses, even if the socket is IPv4.
+ enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16,
+ SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
+ char addressBuffer[SOCKADDRBUFLEN];
+};
+
+class AsynchIoResult : public AsynchResult {
+public:
+ typedef boost::function1<void, AsynchIoResult *> Completer;
+
+ virtual ~AsynchIoResult() {}
+ qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+ size_t getRequested(void) const { return requested; }
+ const WSABUF *getWSABUF(void) const { return &wsabuf; }
+
+protected:
+ void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+
+protected:
+ AsynchIoResult(Completer cb,
+ qpid::sys::AsynchIO::BufferBase *buff, size_t length)
+ : completionCallback(cb), iobuff(buff), requested(length) {}
+
+ virtual void complete(void) = 0;
+ WSABUF wsabuf;
+ Completer completionCallback;
+
+private:
+ qpid::sys::AsynchIO::BufferBase *iobuff;
+ size_t requested; // Number of bytes in original I/O request
+};
+
+class AsynchReadResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ getBuff()->dataCount += bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchReadResult(AsynchIoResult::Completer cb,
+ qpid::sys::AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff->bytes + buff->dataCount;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ qpid::sys::AsynchIO::BufferBase *b = getBuff();
+ b->dataStart += bytes;
+ b->dataCount -= bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteResult(AsynchIoResult::Completer cb,
+ qpid::sys::AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff ? buff->bytes : 0;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteWanted : public AsynchWriteResult {
+
+ // complete() just does completion callback; no buffers used.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteWanted(AsynchIoResult::Completer cb)
+ : AsynchWriteResult(cb, 0, 0) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+};
+
+class AsynchCallbackRequest : public AsynchIoResult {
+ // complete() needs to simply call the completionCallback; no buffers.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchCallbackRequest(AsynchIoResult::Completer cb,
+ qpid::sys::AsynchIO::RequestCallback reqCb)
+ : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+
+ qpid::sys::AsynchIO::RequestCallback reqCallback;
+};
+
+}}} // qpid::sys::windows
+
+#endif /*!_windows_asynchIoResult_h*/
diff --git a/qpid/cpp/src/qpid/sys/windows/Condition.h b/qpid/cpp/src/qpid/sys/windows/Condition.h
new file mode 100755
index 0000000000..cd5aebbf09
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Condition.h
@@ -0,0 +1,77 @@
+#ifndef _sys_windows_Condition_h
+#define _sys_windows_Condition_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+
+#include <time.h>
+#include <boost/noncopyable.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A condition variable for thread synchronization.
+ */
+class Condition : private boost::noncopyable
+{
+ public:
+ inline Condition();
+ inline ~Condition();
+ inline void wait(Mutex&);
+ inline bool wait(Mutex&, const AbsTime& absoluteTime);
+ inline void notify();
+ inline void notifyAll();
+
+ private:
+ boost::condition_variable_any condition;
+};
+
+Condition::Condition() {
+}
+
+Condition::~Condition() {
+}
+
+void Condition::wait(Mutex& mutex) {
+ condition.wait(mutex.mutex);
+}
+
+bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){
+ return condition.timed_wait(mutex.mutex, absoluteTime.timepoint);
+}
+
+void Condition::notify(){
+ condition.notify_one();
+}
+
+void Condition::notifyAll(){
+ condition.notify_all();
+}
+
+}}
+#endif /*!_sys_windows_Condition_h*/
diff --git a/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp
new file mode 100644
index 0000000000..5128f0f8d6
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/FileSysDir.h"
+#include "qpid/sys/StrError.h"
+#include "qpid/Exception.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <direct.h>
+#include <errno.h>
+#include <windows.h>
+#include <strsafe.h>
+
+
+namespace qpid {
+namespace sys {
+
+bool FileSysDir::exists (void) const
+{
+ const char *cpath = dirPath.c_str ();
+ struct _stat s;
+ if (::_stat(cpath, &s)) {
+ if (errno == ENOENT) {
+ return false;
+ }
+ throw qpid::Exception (strError(errno) +
+ ": Can't check directory: " + dirPath);
+ }
+ if (s.st_mode & _S_IFDIR)
+ return true;
+ throw qpid::Exception(dirPath + " is not a directory");
+}
+
+void FileSysDir::mkdir(void)
+{
+ if (::_mkdir(dirPath.c_str()) == -1)
+ throw Exception ("Can't create directory: " + dirPath);
+}
+
+void FileSysDir::forEachFile(Callback cb) const {
+
+ WIN32_FIND_DATAA findFileData;
+ char szDir[MAX_PATH];
+ size_t dirPathLength;
+ HANDLE hFind = INVALID_HANDLE_VALUE;
+
+ // create dirPath+"\*" in szDir
+ StringCchLength (dirPath.c_str(), MAX_PATH, &dirPathLength);
+
+ if (dirPathLength > (MAX_PATH - 3)) {
+ throw Exception ("Directory path is too long: " + dirPath);
+ }
+
+ StringCchCopy(szDir, MAX_PATH, dirPath.c_str());
+ StringCchCat(szDir, MAX_PATH, TEXT("\\*"));
+
+ // Special work for first file
+ hFind = FindFirstFileA(szDir, &findFileData);
+ if (INVALID_HANDLE_VALUE == hFind) {
+ return;
+ }
+
+ // process everything that isn't a directory
+ do {
+ if (!(findFileData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY)) {
+ std::string fileName(dirPath);
+ fileName += "\\";
+ fileName += findFileData.cFileName;
+ cb(fileName);
+ }
+ } while (FindNextFile(hFind, &findFileData) != 0);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
new file mode 100755
index 0000000000..19a1c44875
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
new file mode 100755
index 0000000000..4529ad93ec
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -0,0 +1,58 @@
+#ifndef _sys_windows_IoHandlePrivate_h
+#define _sys_windows_IoHandlePrivate_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/CommonImportExport.h"
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+// Private fd related implementation details
+// There should be either a valid socket handle or a completer callback.
+// Handle is used to associate with poller's iocp; completer is used to
+// inject a completion that will very quickly trigger a callback to the
+// completer from an I/O thread. If the callback mechanism is used, there
+// can be a RequestCallback set - this carries the callback object through
+// from AsynchIO::requestCallback() through to the I/O completion processing.
+class IOHandle {
+public:
+ IOHandle(SOCKET f = INVALID_SOCKET,
+ windows::AsynchIoResult::Completer cb = 0,
+ AsynchIO::RequestCallback reqCallback = 0) :
+ fd(f),
+ event(cb),
+ cbRequest(reqCallback)
+ {}
+
+ SOCKET fd;
+ windows::AsynchIoResult::Completer event;
+ AsynchIO::RequestCallback cbRequest;
+};
+
+}}
+
+#endif /* _sys_windows_IoHandlePrivate_h */
diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
new file mode 100755
index 0000000000..ecb33c5517
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include "qpid/sys/windows/check.h"
+
+#include <winsock2.h>
+#include <windows.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+namespace qpid {
+namespace sys {
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerHandle;
+
+ SOCKET fd;
+ windows::AsynchIoResult::Completer cb;
+ AsynchIO::RequestCallback cbRequest;
+
+ PollerHandlePrivate(SOCKET f,
+ windows::AsynchIoResult::Completer cb0 = 0,
+ AsynchIO::RequestCallback rcb = 0)
+ : fd(f), cb(cb0), cbRequest(rcb)
+ {
+ }
+
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(h.fd, h.event, h.cbRequest))
+{}
+
+PollerHandle::~PollerHandle() {
+ delete impl;
+}
+
+/**
+ * Concrete implementation of Poller to use the Windows I/O Completion
+ * port (IOCP) facility.
+ */
+class PollerPrivate {
+ friend class Poller;
+
+ const HANDLE iocp;
+
+ // The number of threads running the event loop.
+ volatile LONG threadsRunning;
+
+ // Shutdown request is handled by setting isShutdown and injecting a
+ // well-formed completion event into the iocp.
+ bool isShutdown;
+
+ PollerPrivate() :
+ iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)),
+ threadsRunning(0),
+ isShutdown(false) {
+ QPID_WINDOWS_CHECK_NULL(iocp);
+ }
+
+ ~PollerPrivate() {
+ // It's probably okay to ignore any errors here as there can't be
+ // data loss
+ ::CloseHandle(iocp);
+ }
+};
+
+void Poller::shutdown() {
+ // Allow sloppy code to shut us down more than once.
+ if (impl->isShutdown)
+ return;
+ impl->isShutdown = true;
+ ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
+ PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
+}
+
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
+}
+
+bool Poller::interrupt(PollerHandle&) {
+ return false; // There's no concept of a registered handle.
+}
+
+void Poller::run() {
+ while (!impl->isShutdown) {
+ Poller::Event event = this->wait();
+
+ // Handle shutdown
+ switch (event.type) {
+ case Poller::SHUTDOWN:
+ return;
+ break;
+ case Poller::INVALID: // On any type of success or fail completion
+ break;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ }
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
+ HANDLE h = (HANDLE)(handle.impl->fd);
+ if (h != INVALID_HANDLE_VALUE) {
+ HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0);
+ QPID_WINDOWS_CHECK_NULL(iocpHandle);
+ }
+ else {
+ // INPUT is used to request a callback; OUTPUT to request a write
+ assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
+
+ if (dir == Poller::OUTPUT) {
+ windows::AsynchWriteWanted *result =
+ new windows::AsynchWriteWanted(handle.impl->cb);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+ else {
+ windows::AsynchCallbackRequest *result =
+ new windows::AsynchCallbackRequest(handle.impl->cb,
+ handle.impl->cbRequest);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+ }
+}
+
+// All no-ops...
+void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {}
+void Poller::registerHandle(PollerHandle& /*handle*/) {}
+void Poller::unregisterHandle(PollerHandle& /*handle*/) {}
+
+Poller::Event Poller::wait(Duration timeout) {
+ DWORD timeoutMs = 0;
+ DWORD numTransferred = 0;
+ ULONG_PTR completionKey = 0;
+ OVERLAPPED *overlapped = 0;
+ windows::AsynchResult *result = 0;
+
+ // Wait for either an I/O operation to finish (thus signaling the
+ // IOCP handle) or a shutdown request to be made (thus signaling the
+ // shutdown event).
+ if (timeout == TIME_INFINITE)
+ timeoutMs = INFINITE;
+ else
+ timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC);
+
+ InterlockedIncrement(&impl->threadsRunning);
+ bool goodOp = ::GetQueuedCompletionStatus (impl->iocp,
+ &numTransferred,
+ &completionKey,
+ &overlapped,
+ timeoutMs);
+ LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning);
+ if (goodOp) {
+ // Dequeued a successful completion. If it's a posted packet from
+ // shutdown() the overlapped ptr is 0 and key is 1. Else downcast
+ // the OVERLAPPED pointer to an AsynchIoResult and call the
+ // completion handler.
+ if (overlapped == 0 && completionKey == 1) {
+ // If there are other threads still running this wait, re-post
+ // the completion.
+ if (remainingThreads > 0)
+ PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0);
+ return Event(0, SHUTDOWN);
+ }
+
+ result = windows::AsynchResult::from_overlapped(overlapped);
+ result->success (static_cast<size_t>(numTransferred));
+ }
+ else {
+ if (overlapped != 0) {
+ // Dequeued a completion for a failed operation. Downcast back
+ // to the result object and inform it that the operation failed.
+ DWORD status = ::GetLastError();
+ result = windows::AsynchResult::from_overlapped(overlapped);
+ result->failure (static_cast<int>(status));
+ }
+ }
+ return Event(0, INVALID); // TODO - this may need to be changed.
+
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/LockFile.cpp b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp
new file mode 100755
index 0000000000..048c2d5b18
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/LockFile.h"
+#include "qpid/sys/windows/check.h"
+
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+class LockFilePrivate {
+ friend class LockFile;
+
+ HANDLE fd;
+
+public:
+ LockFilePrivate(HANDLE f) : fd(f) {}
+};
+
+LockFile::LockFile(const std::string& path_, bool create)
+ : path(path_), created(create) {
+
+ HANDLE h = ::CreateFile(path.c_str(),
+ create ? (GENERIC_READ|GENERIC_WRITE) : GENERIC_READ,
+ FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
+ 0, /* Default security */
+ create ? OPEN_ALWAYS : OPEN_EXISTING,
+ FILE_FLAG_DELETE_ON_CLOSE, /* Delete file when closed */
+ NULL);
+ if (h == INVALID_HANDLE_VALUE)
+ throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError()));
+
+ // Lock up to 4Gb
+ if (!::LockFile(h, 0, 0, 0xffffffff, 0))
+ throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError()));
+ impl.reset(new LockFilePrivate(h));
+}
+
+LockFile::~LockFile() {
+ if (impl) {
+ if (impl->fd != INVALID_HANDLE_VALUE) {
+ ::UnlockFile(impl->fd, 0, 0, 0xffffffff, 0);
+ ::CloseHandle(impl->fd);
+ }
+ }
+}
+
+}} /* namespace qpid::sys */
diff --git a/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
new file mode 100644
index 0000000000..60b3df7da6
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/MemoryMappedFile.h"
+
+namespace qpid {
+namespace sys {
+class MemoryMappedFilePrivate {};
+
+MemoryMappedFile::MemoryMappedFile() : state(0) {}
+MemoryMappedFile::~MemoryMappedFile() {}
+
+void MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/)
+{
+}
+void MemoryMappedFile::close()
+{
+}
+size_t MemoryMappedFile::getPageSize()
+{
+ return 0;
+}
+char* MemoryMappedFile::map(size_t /*offset*/, size_t /*size*/)
+{
+ return 0;
+}
+void MemoryMappedFile::unmap(char* /*region*/, size_t /*size*/)
+{
+}
+void MemoryMappedFile::flush(char* /*region*/, size_t /*size*/)
+{
+}
+void MemoryMappedFile::expand(size_t /*offset*/)
+{
+}
+bool MemoryMappedFile::isSupported()
+{
+ return false;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/Mutex.h b/qpid/cpp/src/qpid/sys/windows/Mutex.h
new file mode 100755
index 0000000000..5dcc69e836
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Mutex.h
@@ -0,0 +1,188 @@
+#ifndef _sys_windows_Mutex_h
+#define _sys_windows_Mutex_h
+
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/windows/check.h"
+
+#include <boost/version.hpp>
+#if (BOOST_VERSION < 103500)
+#error The Windows port requires Boost version 1.35.0 or later
+#endif
+
+#include <boost/noncopyable.hpp>
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <boost/thread/tss.hpp>
+
+namespace qpid {
+namespace sys {
+
+class Condition;
+
+/**
+ * Mutex lock.
+ */
+class Mutex : private boost::noncopyable {
+ friend class Condition;
+
+public:
+ typedef ::qpid::sys::ScopedLock<Mutex> ScopedLock;
+ typedef ::qpid::sys::ScopedUnlock<Mutex> ScopedUnlock;
+
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline bool trylock();
+
+
+protected:
+ boost::recursive_mutex mutex;
+};
+
+/**
+ * RW lock.
+ */
+class RWlock : private boost::noncopyable {
+ friend class Condition;
+
+public:
+ typedef ::qpid::sys::ScopedRlock<RWlock> ScopedRlock;
+ typedef ::qpid::sys::ScopedWlock<RWlock> ScopedWlock;
+
+ inline RWlock();
+ inline ~RWlock();
+ inline void wlock(); // will write-lock
+ inline void rlock(); // will read-lock
+ inline void unlock();
+ inline void trywlock(); // will write-try
+ inline void tryrlock(); // will read-try
+
+protected:
+ boost::shared_mutex rwMutex;
+ boost::thread_specific_ptr<bool> haveWrite;
+
+ inline bool &write (void);
+};
+
+
+/**
+ * PODMutex is a POD, can be static-initialized with
+ * PODMutex m = QPID_PODMUTEX_INITIALIZER
+ */
+struct PODMutex
+{
+ typedef ::qpid::sys::ScopedLock<PODMutex> ScopedLock;
+
+ inline void lock();
+ inline void unlock();
+ inline bool trylock();
+
+ // Must be public to be a POD:
+ boost::recursive_mutex mutex;
+};
+
+#define QPID_MUTEX_INITIALIZER 0
+
+void PODMutex::lock() {
+ mutex.lock();
+}
+
+void PODMutex::unlock() {
+ mutex.unlock();
+}
+
+bool PODMutex::trylock() {
+ return mutex.try_lock();
+}
+
+Mutex::Mutex() {
+}
+
+Mutex::~Mutex(){
+}
+
+void Mutex::lock() {
+ mutex.lock();
+}
+
+void Mutex::unlock() {
+ mutex.unlock();
+}
+
+bool Mutex::trylock() {
+ return mutex.try_lock();
+}
+
+
+RWlock::RWlock() {
+}
+
+RWlock::~RWlock(){
+}
+
+void RWlock::wlock() {
+ bool &writer = write();
+ rwMutex.lock();
+ writer = true; // Remember this thread has write lock held.
+}
+
+void RWlock::rlock() {
+ bool &writer = write();
+ rwMutex.lock_shared();
+ writer = false; // Remember this thread has shared lock held.
+}
+
+void RWlock::unlock() {
+ bool &writer = write();
+ if (writer)
+ rwMutex.unlock();
+ else
+ rwMutex.unlock_shared();
+}
+
+void RWlock::trywlock() {
+ bool &writer = write();
+ // shared_mutex::try_lock() seems to not be available... emulate it with
+ // a timed lock().
+ boost::system_time now = boost::get_system_time();
+ if (rwMutex.timed_lock(now))
+ writer = true;
+}
+
+void RWlock::tryrlock() {
+ bool &writer = write();
+ if (rwMutex.try_lock_shared())
+ writer = false;
+}
+
+bool & RWlock::write (void) {
+ // Accessing thread-specific and stack-local info, so no locks needed.
+ bool *writePtr = haveWrite.get();
+ if (writePtr == 0) {
+ writePtr = new bool(false);
+ haveWrite.reset(writePtr);
+ }
+ return *writePtr;
+}
+
+}}
+#endif /*!_sys_windows_Mutex_h*/
diff --git a/qpid/cpp/src/qpid/sys/windows/Path.cpp b/qpid/cpp/src/qpid/sys/windows/Path.cpp
new file mode 100644
index 0000000000..1cb4521fde
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Path.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/Path.h"
+#include "qpid/sys/StrError.h"
+#include "qpid/Exception.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <direct.h>
+#include <errno.h>
+#include <windows.h>
+#include <strsafe.h>
+
+
+namespace qpid {
+namespace sys {
+
+const std::string Path::separator("\\");
+
+namespace {
+// Return true for success, false for ENOENT, throw otherwise.
+bool getStat(const std::string& path, struct _stat& s) {
+ if (::_stat(path.c_str(), &s)) {
+ if (errno == ENOENT) return false;
+ throw qpid::Exception("cannot stat: " + path + ": " + strError(errno));
+ }
+ return true;
+}
+
+bool isFlag(const std::string& path, unsigned long flag) {
+ struct _stat s;
+ return getStat(path, s) && (s.st_mode & flag);
+}
+}
+
+bool Path::exists () const {
+ struct _stat s;
+ return getStat(path, s);
+}
+
+bool Path::isFile() const { return isFlag(path, _S_IFREG); }
+bool Path::isDirectory() const { return isFlag(path, _S_IFDIR); }
+
+bool Path::isAbsolute() const {
+ return (path.size() > 0 && (path[0] == separator[0] || path[0] == '/'))
+ || (path.size() > 1 && (isalpha(path[0]) && path[1] == ':'));
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
new file mode 100755
index 0000000000..062458ae5f
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
@@ -0,0 +1,101 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/sys/PipeHandle.h"
+#include "qpid/sys/windows/check.h"
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+PipeHandle::PipeHandle(bool nonBlocking) {
+
+ SOCKET listener, pair[2];
+ struct sockaddr_in addr;
+ int err;
+ int addrlen = sizeof(addr);
+ pair[0] = pair[1] = INVALID_SOCKET;
+ if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.sin_port = 0;
+
+ err = bind(listener, (const struct sockaddr*) &addr, sizeof(addr));
+ if (err == SOCKET_ERROR) {
+ err = WSAGetLastError();
+ closesocket(listener);
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ err = getsockname(listener, (struct sockaddr*) &addr, &addrlen);
+ if (err == SOCKET_ERROR) {
+ err = WSAGetLastError();
+ closesocket(listener);
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ try {
+ if (listen(listener, 1) == SOCKET_ERROR)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if ((pair[0] = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if (connect(pair[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if ((pair[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+
+ closesocket(listener);
+ writeFd = pair[0];
+ readFd = pair[1];
+ }
+ catch (...) {
+ closesocket(listener);
+ if (pair[0] != INVALID_SOCKET)
+ closesocket(pair[0]);
+ throw;
+ }
+
+ // Set the socket to non-blocking
+ if (nonBlocking) {
+ unsigned long nonblock = 1;
+ ioctlsocket(readFd, FIONBIO, &nonblock);
+ }
+}
+
+PipeHandle::~PipeHandle() {
+ closesocket(readFd);
+ closesocket(writeFd);
+}
+
+int PipeHandle::read(void* buf, size_t bufSize) {
+ return ::recv(readFd, (char *)buf, bufSize, 0);
+}
+
+int PipeHandle::write(const void* buf, size_t bufSize) {
+ return ::send(writeFd, (const char *)buf, bufSize, 0);
+}
+
+int PipeHandle::getReadHandle() {
+ return readFd;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
new file mode 100644
index 0000000000..3e2a5fb36c
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+
+#include <boost/bind.hpp>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+// PollableConditionPrivate will reuse the IocpPoller's ability to queue
+// a completion to the IOCP and have it dispatched to the completer callback
+// noted in the IOHandlePrivate when the request is queued. The
+// AsynchCallbackRequest object is not really used - we already have the
+// desired callback for the user of PollableCondition.
+class PollableConditionPrivate : private IOHandle {
+ friend class PollableCondition;
+
+private:
+ PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
+ sys::PollableCondition& parent,
+ const boost::shared_ptr<sys::Poller>& poller);
+ ~PollableConditionPrivate();
+
+ void poke();
+ void dispatch(windows::AsynchIoResult *result);
+
+private:
+ PollableCondition::Callback cb;
+ PollableCondition& parent;
+ boost::shared_ptr<sys::Poller> poller;
+ LONG isSet;
+ LONG isDispatching;
+};
+
+PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
+ sys::PollableCondition& parent,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)),
+ cb(cb), parent(parent), poller(poller), isSet(0), isDispatching(0)
+{
+}
+
+PollableConditionPrivate::~PollableConditionPrivate()
+{
+}
+
+void PollableConditionPrivate::poke()
+{
+ // monitorHandle will queue a completion for the IOCP; when it's handled, a
+ // poller thread will call back to dispatch() below.
+ PollerHandle ph(*this);
+ poller->monitorHandle(ph, Poller::INPUT);
+}
+
+void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result)
+{
+ delete result; // Poller::monitorHandle() allocates this
+ // If isDispatching is already set, just return. Else, enter.
+ if (::InterlockedCompareExchange(&isDispatching, 1, 0) == 1)
+ return;
+ cb(parent);
+ LONG oops = ::InterlockedDecrement(&isDispatching); // Result must be 0
+ assert(!oops);
+ if (isSet)
+ poke();
+}
+
+ /* PollableCondition */
+
+PollableCondition::PollableCondition(const Callback& cb,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : impl(new PollableConditionPrivate(cb, *this, poller))
+{
+}
+
+PollableCondition::~PollableCondition()
+{
+ delete impl;
+}
+
+void PollableCondition::set() {
+ // Add one to the set count and poke it to provoke a callback
+ ::InterlockedIncrement(&impl->isSet);
+ impl->poke();
+}
+
+void PollableCondition::clear() {
+ ::InterlockedExchange(&impl->isSet, 0);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/QpidDllMain.h b/qpid/cpp/src/qpid/sys/windows/QpidDllMain.h
new file mode 100644
index 0000000000..74eaf0256a
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/QpidDllMain.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Include this file once in each DLL that relies on SystemInfo.h:
+ * threadSafeShutdown(). Note that Thread.cpp has a more elaborate
+ * DllMain, that also provides this functionality separately.
+ *
+ * Teardown is in the reverse order of the DLL dependencies used
+ * during the load phase. The calls to DllMain and the static
+ * destructors are from the same thread, so no locking is necessary
+ * and there is no downside to an invocation of DllMain by multiple
+ * Qpid DLLs.
+ */
+
+#ifdef _DLL
+
+#include <qpid/ImportExport.h>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+QPID_IMPORT bool processExiting;
+QPID_IMPORT bool libraryUnloading;
+
+}}} // namespace qpid::sys::SystemInfo
+
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+
+ case DLL_PROCESS_DETACH:
+ // Remember how the process is terminating this DLL.
+ if (reserved != NULL) {
+ qpid::sys::windows::processExiting = true;
+ // Danger: all threading suspect, including indirect use of malloc or locks.
+ // Think twice before adding more functionality here.
+ return TRUE;
+ }
+ else {
+ qpid::sys::windows::libraryUnloading = true;
+ }
+ break;
+ }
+ return TRUE;
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/windows/Shlib.cpp b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp
new file mode 100644
index 0000000000..ba18747eb4
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Shlib.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/windows/check.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+void Shlib::load(const char* name) {
+ HMODULE h = LoadLibrary(name);
+ if (h == NULL) {
+ throw QPID_WINDOWS_ERROR(GetLastError());
+ }
+ handle = static_cast<void*>(h);
+}
+
+void Shlib::unload() {
+ if (handle) {
+ if (FreeLibrary(static_cast<HMODULE>(handle)) == 0) {
+ throw QPID_WINDOWS_ERROR(GetLastError());
+ }
+ handle = 0;
+ }
+}
+
+void* Shlib::getSymbol(const char* name) {
+ // Double cast avoids warning about casting function pointer to object
+ void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name)));
+ if (sym == NULL)
+ throw QPID_WINDOWS_ERROR(GetLastError());
+ return sym;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
new file mode 100644
index 0000000000..b0903752c6
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include "qpid/log/Logger.h"
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+ host(host0),
+ port(port0),
+ addrInfo(0),
+ currentAddrInfo(0)
+{
+}
+
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+ host(sa.host),
+ port(sa.port),
+ addrInfo(0),
+ currentAddrInfo(0)
+{
+}
+
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+ SocketAddress temp(sa);
+
+ std::swap(temp, *this);
+ return *this;
+}
+
+SocketAddress::~SocketAddress()
+{
+ if (addrInfo) {
+ ::freeaddrinfo(addrInfo);
+ }
+}
+
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen, bool dispNameOnly, bool hideDecoration)
+{
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6:
+ if (!hideDecoration) {
+ s += "["; s += dispName; s+= "]";
+ } else {
+ s += dispName;
+ }
+ break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ if (!dispNameOnly) {
+ s += ":";
+ s += servName;
+ }
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric, bool dispNameOnly, bool hideDecoration) const
+{
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen, dispNameOnly, hideDecoration);
+}
+
+std::string SocketAddress::getHost() const
+{
+ return host;
+}
+
+/**
+ * Return true if this SocketAddress is IPv4 or IPv6
+ */
+bool SocketAddress::isIp() const
+{
+ const ::addrinfo& ai = getAddrInfo(*this);
+ return ai.ai_family == AF_INET || ai.ai_family == AF_INET6;
+}
+
+/**
+ * this represents the low address of an ACL address range.
+ * Given rangeHi that represents the high address,
+ * return a string showing the numeric comparisons that the
+ * inRange checks will do for address pair.
+ */
+std::string SocketAddress::comparisonDetails(const SocketAddress& rangeHi) const
+{
+ std::ostringstream os;
+ SocketAddress thisSa(*this);
+ SocketAddress rangeHiSa(rangeHi);
+ (void) getAddrInfo(thisSa);
+ (void) getAddrInfo(rangeHiSa);
+ os << "(" << thisSa.asString(true, true, false) <<
+ "," << rangeHiSa.asString(true, true, false) << ")";
+ while (thisSa.nextAddress()) {
+ if (!rangeHiSa.nextAddress()) {
+ throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() +
+ rangeHi.asString())));
+ }
+ os << ",(" << thisSa.asString(true, true, false) <<
+ "," << rangeHiSa.asString(true, true, false) << ")";
+ }
+ if (rangeHiSa.nextAddress()) {
+ throw(Exception(QPID_MSG("Comparison iteration fails: " + (*this).asString() +
+ rangeHi.asString())));
+ }
+ std::string result = os.str();
+ return result;
+}
+
+/**
+ * For ACL address matching make sure that the two addresses, *this
+ * which is the low address and hiPeer which is the high address, are
+ * both numeric ip addresses of the same family and that hi > *this.
+ *
+ * Note that if the addresses resolve to more than one struct addrinfo
+ * then this and the hiPeer must be equal. This avoids having to do
+ * difficult range checks where the this and hiPeer both resolve to
+ * multiple IPv4 or IPv6 addresses.
+ *
+ * This check is run at acl file load time and not at run tme.
+ */
+bool SocketAddress::isComparable(const SocketAddress& hiPeer) const {
+ try {
+ // May only compare if this socket is IPv4 or IPv6
+ SocketAddress lo(*this);
+ const ::addrinfo& peerLoInfo = getAddrInfo(lo);
+ if (!(peerLoInfo.ai_family == AF_INET || peerLoInfo.ai_family == AF_INET6)) {
+ return false;
+ }
+ try {
+ // May only compare if peer socket is same family
+ SocketAddress hi(hiPeer);
+ const ::addrinfo& peerHiInfo = getAddrInfo(hi);
+ if (peerLoInfo.ai_family != peerHiInfo.ai_family) {
+ return false;
+ }
+ // Host names that resolve to lists are allowed if they are equal.
+ // For example: localhost, or fjord.lab.example.com
+ if ((*this).asString() == hiPeer.asString()) {
+ return true;
+ }
+ // May only compare if this and peer resolve to single address.
+ if (lo.nextAddress() || hi.nextAddress()) {
+ return false;
+ }
+ // Make sure that the lo/hi relationship is ok
+ int res;
+ if (!compareAddresses(peerLoInfo, peerHiInfo, res) || res < 0) {
+ return false;
+ }
+ return true;
+ } catch (Exception) {
+ // failed to resolve hi
+ return false;
+ }
+ } catch (Exception) {
+ // failed to resolve lo
+ return false;
+ }
+}
+
+/**
+ * *this SocketAddress was created from the numeric IP address of a
+ * connecting host.
+ * The lo and hi addresses are the limit checks from the ACL file.
+ * Return true if this address is in range of any of the address pairs
+ * in the limit check range.
+ *
+ * This check is executed on every incoming connection.
+ */
+bool SocketAddress::inRange(const SocketAddress& lo,
+ const SocketAddress& hi) const
+{
+ (*this).firstAddress();
+ lo.firstAddress();
+ hi.firstAddress();
+ const ::addrinfo& thisInfo = getAddrInfo(*this);
+ const ::addrinfo& loInfo = getAddrInfo(lo);
+ const ::addrinfo& hiInfo = getAddrInfo(hi);
+ if (inRange(thisInfo, loInfo, hiInfo)) {
+ return true;
+ }
+ while (lo.nextAddress()) {
+ if (!hi.nextAddress()) {
+ assert (false);
+ throw(Exception(QPID_MSG("Comparison iteration fails: " +
+ lo.asString() + hi.asString())));
+ }
+ const ::addrinfo& loInfo = getAddrInfo(lo);
+ const ::addrinfo& hiInfo = getAddrInfo(hi);
+ if (inRange(thisInfo, loInfo, hiInfo)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * *this SocketAddress was created from the numeric IP address of a
+ * connecting host.
+ * The lo and hi addresses are one binary address pair from a range
+ * given in an ACL file.
+ * Return true if this binary address is '>= lo' and '<= hi'.
+ */
+bool SocketAddress::inRange(const ::addrinfo& thisInfo,
+ const ::addrinfo& lo,
+ const ::addrinfo& hi) const
+{
+ int resLo;
+ int resHi;
+ if (!compareAddresses(lo, thisInfo, resLo)) {
+ return false;
+ }
+ if (!compareAddresses(hi, thisInfo, resHi)) {
+ return false;
+ }
+ if (resLo < 0) {
+ return false;
+ }
+ if (resHi > 0) {
+ return false;
+ }
+ return true;
+}
+
+/**
+ * Compare this address against two binary low/high addresses.
+ * return true with result holding the comparison.
+ */
+bool SocketAddress::compareAddresses(const struct addrinfo& lo,
+ const struct addrinfo& hi,
+ int& result) const
+{
+ if (lo.ai_family != hi.ai_family) {
+ return false;
+ }
+ if (lo.ai_family == AF_INET) {
+ struct sockaddr_in* sin4lo = (struct sockaddr_in*)lo.ai_addr;
+ struct sockaddr_in* sin4hi = (struct sockaddr_in*)hi.ai_addr;
+ result = memcmp(&sin4hi->sin_addr, &sin4lo->sin_addr, sizeof(in_addr));
+ } else if (lo.ai_family == AF_INET6) {
+ struct sockaddr_in6* sin6lo = (struct sockaddr_in6*)lo.ai_addr;
+ struct sockaddr_in6* sin6hi = (struct sockaddr_in6*)hi.ai_addr;
+ result = memcmp(&sin6hi->sin6_addr, &sin6lo->sin6_addr, sizeof(in6_addr));
+ } else {
+ assert (false);
+ return false;
+ }
+ return true;
+}
+
+void SocketAddress::firstAddress() const {
+ if (addrInfo) {
+ currentAddrInfo = addrInfo;
+ } else {
+ (void) getAddrInfo(*this);
+ }
+}
+
+bool SocketAddress::nextAddress() const {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+ if (!sa.addrInfo) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (sa.host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = sa.host.c_str();
+ }
+ const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
+ }
+
+ return *sa.currentAddrInfo;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
new file mode 100644
index 0000000000..29f673c156
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -0,0 +1,735 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SslAsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/sys/windows/check.h"
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+#include <queue>
+#include <boost/bind.hpp>
+#include "AsynchIO.h"
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+namespace {
+
+ /*
+ * To make the SSL encryption more efficient, set up a new BufferBase
+ * that leaves room for the SSL header to be prepended and the SSL
+ * trailer to be appended.
+ *
+ * This works by accepting a properly formed BufferBase, remembering it,
+ * and resetting the members of this struct to reflect the reserved
+ * header and trailer areas. It's only needed for giving buffers up to
+ * the frame layer for writing into.
+ */
+ struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase {
+ qpid::sys::AsynchIO::BufferBase* aioBuff;
+
+ SslIoBuff (qpid::sys::AsynchIO::BufferBase *base,
+ const SecPkgContext_StreamSizes &sizes)
+ : qpid::sys::AsynchIO::BufferBase(&base->bytes[sizes.cbHeader],
+ std::min(base->byteCount - sizes.cbHeader - sizes.cbTrailer,
+ sizes.cbMaximumMessage)),
+ aioBuff(base)
+ {}
+
+ ~SslIoBuff() {}
+ };
+}
+
+SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb,
+ NegotiateDoneCallback nCb) :
+ credHandle(hCred),
+ aio(0),
+ state(Negotiating),
+ readCallback(rCb),
+ idleCallback(iCb),
+ negotiateDoneCallback(nCb),
+ queuedDelete(false),
+ queuedClose(false),
+ reapCheckPending(false),
+ started(false),
+ leftoverPlaintext(0)
+{
+ SecInvalidateHandle(&ctxtHandle);
+ peerAddress = s.getPeerAddress();
+ aio = qpid::sys::AsynchIO::create(s,
+ boost::bind(&SslAsynchIO::sslDataIn, this, _1, _2),
+ eofCb,
+ disCb,
+ cCb,
+ eCb,
+ boost::bind(&SslAsynchIO::idle, this, _1));
+}
+
+SslAsynchIO::~SslAsynchIO() {
+ leftoverPlaintext = 0;
+}
+
+void SslAsynchIO::queueForDeletion() {
+ // Called exactly once, always on the IO completion thread.
+ bool authenticated = (state != Negotiating);
+ state = ShuttingDown;
+ if (authenticated) {
+ // Tell SChannel we are done.
+ DWORD shutdown = SCHANNEL_SHUTDOWN;
+ SecBuffer shutBuff;
+ shutBuff.cbBuffer = sizeof(DWORD);
+ shutBuff.BufferType = SECBUFFER_TOKEN;
+ shutBuff.pvBuffer = &shutdown;
+ SecBufferDesc desc;
+ desc.ulVersion = SECBUFFER_VERSION;
+ desc.cBuffers = 1;
+ desc.pBuffers = &shutBuff;
+ ::ApplyControlToken(&ctxtHandle, &desc);
+ negotiateStep(0);
+ }
+
+ queueWriteClose();
+ queuedDelete = true;
+
+ // This method effectively disconnects the layer above; pass it on the
+ // AsynchIO and delete.
+ aio->queueForDeletion();
+
+ if (!reapCheckPending)
+ delete(this);
+}
+
+void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) {
+ aio->start(poller);
+ started = true;
+ startNegotiate();
+}
+
+void SslAsynchIO::createBuffers(uint32_t size) {
+ // Reserve an extra buffer to hold unread plaintext or trailing encrypted input.
+ windows::AsynchIO *waio = dynamic_cast<windows::AsynchIO*>(aio);
+ waio->setBufferCount(waio->getBufferCount() + 1);
+ aio->createBuffers(size);
+}
+
+void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+ aio->queueReadBuffer(buff);
+}
+
+void SslAsynchIO::unread(AsynchIO::BufferBase* buff) {
+ // This is plaintext data being given back for more. Since it's already
+ // decrypted, don't give it back to the aio layer; keep it to append
+ // any new data for the upper layer.
+ assert(buff);
+ buff->squish();
+ assert(leftoverPlaintext == 0);
+ leftoverPlaintext = buff;
+}
+
+void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+ // @@TODO: Need to delay the write if the session is renegotiating.
+
+ // Should not have gotten here without an SslIoBuff. This assert is
+ // primarily to catch any stray cases where write is called with a buffer
+ // not obtained via getQueuedBuffer.
+ SslIoBuff *sslBuff = dynamic_cast<SslIoBuff*>(buff);
+ assert(sslBuff != 0);
+
+ // Encrypt and hand off to the io layer. Remember that the upper layer's
+ // encoding was working on, and adjusting counts for, the SslIoBuff.
+ // Update the count of the original BufferBase before handing off to
+ // the I/O layer.
+ buff = sslBuff->aioBuff;
+ SecBuffer buffs[4];
+ buffs[0].cbBuffer = schSizes.cbHeader;
+ buffs[0].BufferType = SECBUFFER_STREAM_HEADER;
+ buffs[0].pvBuffer = buff->bytes; // This space was left by SslIoBuff
+ buffs[1].cbBuffer = sslBuff->dataCount;
+ buffs[1].BufferType = SECBUFFER_DATA;
+ buffs[1].pvBuffer = sslBuff->bytes;
+ buffs[2].cbBuffer = schSizes.cbTrailer;
+ buffs[2].BufferType = SECBUFFER_STREAM_TRAILER;
+ buffs[2].pvBuffer = &sslBuff->bytes[sslBuff->dataCount];
+ buffs[3].cbBuffer = 0;
+ buffs[3].BufferType = SECBUFFER_EMPTY;
+ buffs[3].pvBuffer = 0;
+ SecBufferDesc buffDesc;
+ buffDesc.ulVersion = SECBUFFER_VERSION;
+ buffDesc.cBuffers = 4;
+ buffDesc.pBuffers = buffs;
+ SECURITY_STATUS status = ::EncryptMessage(&ctxtHandle, 0, &buffDesc, 0);
+
+ // EncryptMessage encrypts the data in place. The header and trailer
+ // areas were left previously and must now be included in the updated
+ // count of bytes to write to the peer.
+ delete sslBuff;
+ buff->dataCount = buffs[0].cbBuffer + buffs[1].cbBuffer + buffs[2].cbBuffer;
+ aio->queueWrite(buff);
+}
+
+void SslAsynchIO::notifyPendingWrite() {
+ aio->notifyPendingWrite();
+}
+
+void SslAsynchIO::queueWriteClose() {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (queuedClose)
+ return;
+ queuedClose = true;
+ if (started) {
+ reapCheckPending = true;
+ // Move tear down logic to an IO thread.
+ aio->requestCallback(boost::bind(&SslAsynchIO::reapCheck, this));
+ }
+ aio->queueWriteClose();
+}
+
+void SslAsynchIO::reapCheck() {
+ // Serialized check in the IO thread whether to self-delete.
+ reapCheckPending = false;
+ if (queuedDelete)
+ delete(this);
+}
+
+bool SslAsynchIO::writeQueueEmpty() {
+ return aio->writeQueueEmpty();
+}
+
+// Queue the specified callback for invocation from an I/O thread.
+void SslAsynchIO::requestCallback(RequestCallback callback) {
+ aio->requestCallback(callback);
+}
+
+/**
+ * Return a queued buffer read to put new data in for writing.
+ * This method ALWAYS returns a SslIoBuff reflecting a BufferBase from
+ * the aio layer that has header and trailer space reserved.
+ */
+AsynchIO::BufferBase* SslAsynchIO::getQueuedBuffer() {
+ SslIoBuff *sslBuff = 0;
+ BufferBase* buff = aio->getQueuedBuffer();
+ if (buff == 0)
+ return 0;
+
+ sslBuff = new SslIoBuff(buff, schSizes);
+ return sslBuff;
+}
+
+SecuritySettings SslAsynchIO::getSecuritySettings() {
+ SecPkgContext_KeyInfo info;
+ memset(&info, 0, sizeof(info));
+ ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_KEY_INFO, &info);
+
+ SecuritySettings settings;
+ settings.ssf = info.KeySize;
+ settings.authid = std::string();
+ return settings;
+}
+
+void SslAsynchIO::negotiationDone() {
+ switch(state) {
+ case Negotiating:
+ ::QueryContextAttributes(&ctxtHandle,
+ SECPKG_ATTR_STREAM_SIZES,
+ &schSizes);
+ state = Running;
+ if (negotiateDoneCallback)
+ negotiateDoneCallback(SEC_E_OK);
+ break;
+ case Redo:
+ state = Running;
+ break;
+ case ShuttingDown:
+ break;
+ default:
+ assert(0);
+ }
+}
+
+void SslAsynchIO::negotiationFailed(SECURITY_STATUS status) {
+ QPID_LOG(notice, "SSL negotiation failed to " << peerAddress << ": " <<
+ qpid::sys::strError(status));
+ if (negotiateDoneCallback)
+ negotiateDoneCallback(status);
+ else
+ queueWriteClose();
+}
+
+void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
+ if (state == ShuttingDown) {
+ return;
+ }
+ if (state != Running) {
+ negotiateStep(buff);
+ return;
+ }
+
+ // Decrypt one block; if there's legit data, pass it on through.
+ // However, it's also possible that the peer hasn't supplied enough
+ // data yet, or the session needs to be renegotiated, or the session
+ // is ending.
+ SecBuffer recvBuffs[4];
+ recvBuffs[0].cbBuffer = buff->dataCount;
+ recvBuffs[0].BufferType = SECBUFFER_DATA;
+ recvBuffs[0].pvBuffer = &buff->bytes[buff->dataStart];
+ recvBuffs[1].BufferType = SECBUFFER_EMPTY;
+ recvBuffs[2].BufferType = SECBUFFER_EMPTY;
+ recvBuffs[3].BufferType = SECBUFFER_EMPTY;
+ SecBufferDesc buffDesc;
+ buffDesc.ulVersion = SECBUFFER_VERSION;
+ buffDesc.cBuffers = 4;
+ buffDesc.pBuffers = recvBuffs;
+ SECURITY_STATUS status = ::DecryptMessage(&ctxtHandle, &buffDesc, 0, NULL);
+ if (status != SEC_E_OK) {
+ if (status == SEC_E_INCOMPLETE_MESSAGE) {
+ // Give the partially filled buffer back and get more data
+ a.unread(buff);
+ }
+ else {
+ // Don't need this any more...
+ a.queueReadBuffer(buff);
+
+ if (status == SEC_I_RENEGOTIATE) {
+ state = Redo;
+ negotiateStep(0);
+ }
+ else if (status == SEC_I_CONTEXT_EXPIRED) {
+ queueWriteClose();
+ }
+ else {
+ throw QPID_WINDOWS_ERROR(status);
+ }
+ }
+ return;
+ }
+
+ // All decrypted and verified... continue with AMQP. The recvBuffs have
+ // been set up by DecryptMessage to demarcate the SSL header, data, and
+ // trailer, as well as any extra data left over. Walk through and find
+ // that info, adjusting the buff data accordingly to reflect only the
+ // actual decrypted data.
+ // If there's extra data, copy that out to a new buffer and run through
+ // this method again.
+ char *extraBytes = 0;
+ int32_t extraLength = 0;
+ BufferBase *extraBuff = 0;
+ for (int i = 0; i < 4; i++) {
+ switch (recvBuffs[i].BufferType) {
+ case SECBUFFER_STREAM_HEADER:
+ buff->dataStart += recvBuffs[i].cbBuffer;
+ // Fall through - also don't count these bytes as data
+ case SECBUFFER_STREAM_TRAILER:
+ buff->dataCount -= recvBuffs[i].cbBuffer;
+ break;
+ case SECBUFFER_EXTRA:
+ extraBytes = (char *) recvBuffs[i].pvBuffer;
+ extraLength = recvBuffs[i].cbBuffer;
+ break;
+ default:
+ break;
+ }
+ }
+
+ // Since we've already taken (possibly) all the available bytes from the
+ // aio layer, need to be sure that everything that's processable is
+ // processed before returning back to aio. It could be that any
+ // leftoverPlaintext data plus new buff data won't fit in one buffer, so
+ // need to keep going around the input processing loop until either
+ // all the bytes are gone, or there's less than a full frame remaining
+ // (so we can count on more bytes being on the way via aio).
+ do {
+ BufferBase *temp = 0;
+ // See if there was partial data left over from last time. If so, append this new
+ // data to that and release the current buff back to aio. Assume that
+ // leftoverPlaintext was squished so the data starts at 0.
+ if (leftoverPlaintext != 0) {
+ // There is leftover data; append all the new data that will fit.
+ int32_t count = buff->dataCount;
+ if (count) {
+ if (leftoverPlaintext->dataCount + count > leftoverPlaintext->byteCount)
+ count = (leftoverPlaintext->byteCount - leftoverPlaintext->dataCount);
+ ::memmove(&leftoverPlaintext->bytes[leftoverPlaintext->dataCount],
+ &buff->bytes[buff->dataStart], count);
+ leftoverPlaintext->dataCount += count;
+ buff->dataCount -= count;
+ buff->dataStart += count;
+ // Prepare to pass the buffer up. Beware that the read callback
+ // may do an unread(), so move the leftoverPlaintext pointer
+ // out of the way. It also may release the buffer back to aio,
+ // so in either event, the pointer passed to the callback is not
+ // valid on return.
+ temp = leftoverPlaintext;
+ leftoverPlaintext = 0;
+ }
+ else {
+ // All decrypted data used up, decrypt some more or get more from the aio
+ if (extraLength) {
+ buff->dataStart = extraBytes - buff->bytes;
+ buff->dataCount = extraLength;
+ sslDataIn(a, buff);
+ return;
+ }
+ else {
+ a.queueReadBuffer(buff);
+ return;
+ }
+ }
+ }
+ else {
+ // Use buff, but first offload data not yet encrypted
+ if (extraLength) {
+ // Very important to get this buffer from the downstream aio.
+ // The ones constructed from the local getQueuedBuffer() are
+ // restricted size for encrypting. However, data coming up from
+ // TCP may have a bunch of SSL segments coalesced and be much
+ // larger than the maximum single SSL segment.
+ extraBuff = a.getQueuedBuffer();
+ if (0 == extraBuff) {
+ // No leftoverPlaintext, so a spare buffer should be available
+ throw QPID_WINDOWS_ERROR(WSAENOBUFS);
+ }
+ memmove(extraBuff->bytes, extraBytes, extraLength);
+ extraBuff->dataCount = extraLength;
+ extraLength = 0;
+ }
+ temp = buff;
+ buff = 0;
+ }
+ if (readCallback) {
+ // The callback guard here is to prevent an upcall from deleting
+ // this out from under us via queueForDeletion().
+ readCallback(*this, temp);
+ }
+ else
+ a.queueReadBuffer(temp); // What else can we do with this???
+ } while (buff != 0);
+
+ // Ok, the current decrypted data is done. If there was any extra data,
+ // go back and handle that.
+ if (extraBuff != 0) {
+ sslDataIn(a, extraBuff);
+ }
+}
+
+void SslAsynchIO::idle(qpid::sys::AsynchIO&) {
+ // Don't relay idle indication to layer above until SSL session is up.
+ if (state == Running) {
+ state = Running;
+ if (idleCallback)
+ idleCallback(*this);
+ }
+}
+
+/**************************************************/
+
+namespace {
+
+bool unsafeNegotiatedTlsVersion(CtxtHandle &ctxtHandle) {
+ // See if SChannel ultimately negotiated <= SSL3, perhaps due to
+ // global registry settings.
+ SecPkgContext_ConnectionInfo info;
+ ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_CONNECTION_INFO, &info);
+ // Ascending bit patterns denote newer SSL/TLS protocol versions
+ return (info.dwProtocol < SP_PROT_TLS1_SERVER) ? true : false;
+}
+
+} // namespace
+
+/**************************************************/
+
+ClientSslAsynchIO::ClientSslAsynchIO(const std::string& brokerHost,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb,
+ NegotiateDoneCallback nCb) :
+ SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb),
+ serverHost(brokerHost), clientCertRequested(false)
+{
+}
+
+void ClientSslAsynchIO::startNegotiate() {
+ // SEC_CHAR is non-const, so do all the typing here.
+ SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str());
+
+ // Need a buffer to receive the token to send to the server.
+ BufferBase *buff = aio->getQueuedBuffer();
+ ULONG ctxtRequested = ISC_REQ_STREAM | ISC_REQ_USE_SUPPLIED_CREDS;
+ ULONG ctxtAttrs;
+ // sendBuffs gets information to forward to the peer.
+ SecBuffer sendBuffs[2];
+ sendBuffs[0].cbBuffer = buff->byteCount;
+ sendBuffs[0].BufferType = SECBUFFER_TOKEN;
+ sendBuffs[0].pvBuffer = buff->bytes;
+ sendBuffs[1].cbBuffer = 0;
+ sendBuffs[1].BufferType = SECBUFFER_EMPTY;
+ sendBuffs[1].pvBuffer = 0;
+ SecBufferDesc sendBuffDesc;
+ sendBuffDesc.ulVersion = SECBUFFER_VERSION;
+ sendBuffDesc.cBuffers = 2;
+ sendBuffDesc.pBuffers = sendBuffs;
+ SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle,
+ NULL,
+ host,
+ ctxtRequested,
+ 0,
+ 0,
+ NULL,
+ 0,
+ &ctxtHandle,
+ &sendBuffDesc,
+ &ctxtAttrs,
+ NULL);
+
+ if (status == SEC_I_CONTINUE_NEEDED) {
+ buff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(buff);
+ }
+}
+
+void ClientSslAsynchIO::negotiateStep(BufferBase* buff) {
+ // SEC_CHAR is non-const, so do all the typing here.
+ SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str());
+ ULONG ctxtRequested = ISC_REQ_STREAM | ISC_REQ_USE_SUPPLIED_CREDS;
+ ULONG ctxtAttrs;
+
+ // tokenBuffs describe the buffer that's coming in. It should have
+ // a token from the SSL server.
+ SecBuffer tokenBuffs[2];
+ tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0;
+ tokenBuffs[0].BufferType = SECBUFFER_TOKEN;
+ tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0;
+ tokenBuffs[1].cbBuffer = 0;
+ tokenBuffs[1].BufferType = SECBUFFER_EMPTY;
+ tokenBuffs[1].pvBuffer = 0;
+ SecBufferDesc tokenBuffDesc;
+ tokenBuffDesc.ulVersion = SECBUFFER_VERSION;
+ tokenBuffDesc.cBuffers = 2;
+ tokenBuffDesc.pBuffers = tokenBuffs;
+
+ // Need a buffer to receive any token to send back to the server.
+ BufferBase *sendbuff = aio->getQueuedBuffer();
+ // sendBuffs gets information to forward to the peer.
+ SecBuffer sendBuffs[2];
+ sendBuffs[0].cbBuffer = sendbuff->byteCount;
+ sendBuffs[0].BufferType = SECBUFFER_TOKEN;
+ sendBuffs[0].pvBuffer = sendbuff->bytes;
+ sendBuffs[1].cbBuffer = 0;
+ sendBuffs[1].BufferType = SECBUFFER_EMPTY;
+ sendBuffs[1].pvBuffer = 0;
+ SecBufferDesc sendBuffDesc;
+ sendBuffDesc.ulVersion = SECBUFFER_VERSION;
+ sendBuffDesc.cBuffers = 2;
+ sendBuffDesc.pBuffers = sendBuffs;
+
+ SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle,
+ &ctxtHandle,
+ host,
+ ctxtRequested,
+ 0,
+ 0,
+ &tokenBuffDesc,
+ 0,
+ NULL,
+ &sendBuffDesc,
+ &ctxtAttrs,
+ NULL);
+
+ if (status == SEC_E_INCOMPLETE_MESSAGE) {
+ // Not enough - get more data from the server then try again.
+ aio->unread(buff);
+ aio->queueReadBuffer(sendbuff); // Don't need this one for now...
+ return;
+ }
+ // Done with the buffer that came in...
+ if (buff)
+ aio->queueReadBuffer(buff);
+ if (status == SEC_I_CONTINUE_NEEDED) {
+ // check if server has requested a client certificate
+ if (!clientCertRequested) {
+ SecPkgContext_IssuerListInfoEx caList;
+ memset(&caList, 0, sizeof(caList));
+ ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_ISSUER_LIST_EX, &caList);
+ if (caList.cIssuers > 0)
+ clientCertRequested = true;
+ if (caList.aIssuers)
+ ::FreeContextBuffer(caList.aIssuers);
+ }
+
+ sendbuff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(sendbuff);
+ return;
+ }
+ // Nothing to send back to the server...
+ aio->queueReadBuffer(sendbuff);
+
+ if (status == SEC_E_OK && unsafeNegotiatedTlsVersion(ctxtHandle)) {
+ // Refuse a connection that negotiates to less than TLS 1.0.
+ QPID_LOG(notice, "client SSL negotiation to unsafe protocol version.");
+ status = SEC_E_UNSUPPORTED_FUNCTION;
+ }
+
+ // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be
+ // either session stop or negotiation done (session up).
+ if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED)
+ negotiationDone();
+ else {
+ if (clientCertRequested && status == SEC_E_CERT_UNKNOWN)
+ // ISC_REQ_USE_SUPPLIED_CREDS makes us reponsible for this case
+ // (no client cert). Map it to its counterpart:
+ status = SEC_E_INCOMPLETE_CREDENTIALS;
+ negotiationFailed(status);
+ }
+}
+
+/*************************************************/
+
+ServerSslAsynchIO::ServerSslAsynchIO(bool clientMustAuthenticate,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb,
+ NegotiateDoneCallback nCb) :
+ SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb),
+ clientAuth(clientMustAuthenticate)
+{
+}
+
+void ServerSslAsynchIO::startNegotiate() {
+ // Nothing... need the client to send a token first.
+}
+
+void ServerSslAsynchIO::negotiateStep(BufferBase* buff) {
+ ULONG ctxtRequested = ASC_REQ_STREAM;
+ if (clientAuth)
+ ctxtRequested |= ASC_REQ_MUTUAL_AUTH;
+ ULONG ctxtAttrs;
+
+ // tokenBuffs describe the buffer that's coming in. It should have
+ // a token from the SSL server except if shutting down or renegotiating.
+ SecBuffer tokenBuffs[2];
+ tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0;
+ tokenBuffs[0].BufferType = SECBUFFER_TOKEN;
+ tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0;
+ tokenBuffs[1].cbBuffer = 0;
+ tokenBuffs[1].BufferType = SECBUFFER_EMPTY;
+ tokenBuffs[1].pvBuffer = 0;
+ SecBufferDesc tokenBuffDesc;
+ tokenBuffDesc.ulVersion = SECBUFFER_VERSION;
+ tokenBuffDesc.cBuffers = 2;
+ tokenBuffDesc.pBuffers = tokenBuffs;
+
+ // Need a buffer to receive any token to send back to the server.
+ BufferBase *sendbuff = aio->getQueuedBuffer();
+ // sendBuffs gets information to forward to the peer.
+ SecBuffer sendBuffs[2];
+ sendBuffs[0].cbBuffer = sendbuff->byteCount;
+ sendBuffs[0].BufferType = SECBUFFER_TOKEN;
+ sendBuffs[0].pvBuffer = sendbuff->bytes;
+ sendBuffs[1].cbBuffer = 0;
+ sendBuffs[1].BufferType = SECBUFFER_EMPTY;
+ sendBuffs[1].pvBuffer = 0;
+ SecBufferDesc sendBuffDesc;
+ sendBuffDesc.ulVersion = SECBUFFER_VERSION;
+ sendBuffDesc.cBuffers = 2;
+ sendBuffDesc.pBuffers = sendBuffs;
+ PCtxtHandle ctxtHandlePtr = (SecIsValidHandle(&ctxtHandle)) ? &ctxtHandle : 0;
+ SECURITY_STATUS status = ::AcceptSecurityContext(&credHandle,
+ ctxtHandlePtr,
+ &tokenBuffDesc,
+ ctxtRequested,
+ 0,
+ &ctxtHandle,
+ &sendBuffDesc,
+ &ctxtAttrs,
+ NULL);
+ if (status == SEC_E_INCOMPLETE_MESSAGE) {
+ // Not enough - get more data from the server then try again.
+ if (buff)
+ aio->unread(buff);
+ aio->queueReadBuffer(sendbuff); // Don't need this one for now...
+ return;
+ }
+ // Done with the buffer that came in...
+ if (buff)
+ aio->queueReadBuffer(buff);
+ if (status == SEC_I_CONTINUE_NEEDED) {
+ sendbuff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(sendbuff);
+ return;
+ }
+ // There may have been a token generated; if so, send it to the client.
+ if (sendBuffs[0].cbBuffer > 0 && state != ShuttingDown) {
+ sendbuff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(sendbuff);
+ }
+ else
+ // Nothing to send back to the server...
+ aio->queueReadBuffer(sendbuff);
+
+ if (status == SEC_E_OK && unsafeNegotiatedTlsVersion(ctxtHandle)) {
+ // Refuse a connection that negotiates to less than TLS 1.0.
+ QPID_LOG(notice, "server SSL negotiation to unsafe protocol version.");
+ status = SEC_E_UNSUPPORTED_FUNCTION;
+ }
+
+ // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be
+ // either session stop or negotiation done (session up).
+ if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED) {
+ if (clientAuth)
+ QPID_LOG(warning, "DID WE CHECK FOR CLIENT AUTH???");
+
+ negotiationDone();
+ }
+ else {
+ negotiationFailed(status);
+ }
+}
+
+}}} // namespace qpid::sys::windows
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h
new file mode 100644
index 0000000000..3d00e1c429
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h
@@ -0,0 +1,192 @@
+#ifndef _sys_windows_SslAsynchIO
+#define _sys_windows_SslAsynchIO
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/CommonImportExport.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <windows.h>
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * SSL/Schannel shim between the frame-handling and AsynchIO layers.
+ * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class
+ * gets involved for SSL negotiations and encrypt/decrypt. The details of
+ * how this all works are invisible to the layers on either side. The only
+ * change from normal AsynchIO usage is that there's an extra callback
+ * from SslAsynchIO to indicate that the initial session negotiation is
+ * complete.
+ *
+ * The details of session negotiation are different for client and server
+ * SSL roles. These differences are handled by deriving separate client
+ * and server role classes.
+ */
+class SslAsynchIO : public qpid::sys::AsynchIO {
+public:
+ typedef boost::function1<void, SECURITY_STATUS> NegotiateDoneCallback;
+
+ SslAsynchIO(const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0,
+ NegotiateDoneCallback nCb = 0);
+ ~SslAsynchIO();
+
+ virtual void queueForDeletion();
+
+ virtual void start(qpid::sys::Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void requestCallback(RequestCallback);
+ virtual BufferBase* getQueuedBuffer();
+ virtual SecuritySettings getSecuritySettings(void);
+
+protected:
+ CredHandle credHandle;
+
+ // AsynchIO layer below that's actually doing the I/O
+ qpid::sys::AsynchIO *aio;
+ Mutex lock;
+
+ // Track what the state of the SSL session is. Have to know when it's
+ // time to notify the upper layer that the session is up, and also to
+ // know when it's not legit to pass data through to either side.
+ enum { Negotiating, Running, Redo, ShuttingDown } state;
+ CtxtHandle ctxtHandle;
+ TimeStamp credExpiry;
+
+ // Client- and server-side SSL subclasses implement these to do the
+ // proper negotiation steps. negotiateStep() is called with a buffer
+ // just received from the peer.
+ virtual void startNegotiate() = 0;
+ virtual void negotiateStep(BufferBase *buff) = 0;
+
+ // The negotiating steps call one of these when it's finalized:
+ void negotiationDone();
+ void negotiationFailed(SECURITY_STATUS status);
+
+private:
+ // These are callbacks from AsynchIO to here.
+ void sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff);
+ void idle(qpid::sys::AsynchIO&);
+ void reapCheck();
+
+ // These callbacks are to the layer above.
+ ReadCallback readCallback;
+ IdleCallback idleCallback;
+ NegotiateDoneCallback negotiateDoneCallback;
+
+ volatile bool queuedDelete;
+ volatile bool queuedClose;
+ volatile bool reapCheckPending;
+ bool started;
+
+ // Address of peer, in case it's needed for logging.
+ std::string peerAddress;
+
+ // Partial buffer of decrypted plaintext given back by the layer above.
+ AsynchIO::BufferBase *leftoverPlaintext;
+
+ SecPkgContext_StreamSizes schSizes;
+};
+
+/*
+ * SSL/Schannel client-side shim between the frame-handling and AsynchIO
+ * layers.
+ */
+class ClientSslAsynchIO : public SslAsynchIO {
+public:
+ // Args same as for SslIoShim, with the addition of brokerHost which is
+ // the expected SSL name of the server.
+ QPID_COMMON_EXTERN ClientSslAsynchIO(const std::string& brokerHost,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0,
+ NegotiateDoneCallback nCb = 0);
+
+private:
+ std::string serverHost;
+ bool clientCertRequested;
+
+ // Client- and server-side SSL subclasses implement these to do the
+ // proper negotiation steps. negotiateStep() is called with a buffer
+ // just received from the peer.
+ void startNegotiate();
+ void negotiateStep(BufferBase *buff);
+};
+/*
+ * SSL/Schannel server-side shim between the frame-handling and AsynchIO
+ * layers.
+ */
+class ServerSslAsynchIO : public SslAsynchIO {
+public:
+ QPID_COMMON_EXTERN ServerSslAsynchIO(bool clientMustAuthenticate,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0,
+ NegotiateDoneCallback nCb = 0);
+
+private:
+ bool clientAuth;
+
+ // Client- and server-side SSL subclasses implement these to do the
+ // proper negotiation steps. negotiateStep() is called with a buffer
+ // just received from the peer.
+ void startNegotiate();
+ void negotiateStep(BufferBase *buff);
+};
+
+}}} // namespace qpid::sys::windows
+
+#endif // _sys_windows_SslAsynchIO
diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp
new file mode 100644
index 0000000000..de8f10b0e9
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp
@@ -0,0 +1,279 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <string>
+#include <windows.h>
+#include "qpid/Msg.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/util.h"
+#include "qpid/sys/windows/SslCredential.h"
+
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+
+SslCredential::SslCredential() : certStore(0), cert(0), hostnameVerification(true)
+{
+ SecInvalidateHandle(&credHandle);
+ memset(&cred, 0, sizeof(cred));
+ cred.dwVersion = SCHANNEL_CRED_VERSION;
+ cred.dwFlags = SCH_CRED_NO_DEFAULT_CREDS;
+}
+
+SslCredential::~SslCredential()
+{
+ if (SecIsValidHandle(&credHandle))
+ ::FreeCredentialsHandle(&credHandle);
+ if (cert)
+ ::CertFreeCertificateContext(cert);
+ if (certStore)
+ ::CertCloseStore(certStore, CERT_CLOSE_STORE_FORCE_FLAG);
+}
+
+bool SslCredential::load(const std::string& certName)
+{
+ cert = findCertificate(certName);
+ if (cert != NULL) {
+ // assign the certificate into the credentials
+ cred.paCred = &cert;
+ cred.cCreds = 1;
+ }
+ if (!hostnameVerification)
+ cred.dwFlags |= SCH_CRED_NO_SERVERNAME_CHECK;
+
+ SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+ UNISP_NAME,
+ SECPKG_CRED_OUTBOUND,
+ NULL,
+ &cred,
+ NULL,
+ NULL,
+ &credHandle,
+ &credExpiry);
+ if (status != SEC_E_OK)
+ throw QPID_WINDOWS_ERROR(status);
+
+ return (cert != NULL);
+}
+
+CredHandle SslCredential::handle()
+{
+ return credHandle;
+}
+
+std::string SslCredential::error()
+{
+ // Certificate needed after all. Return main error and log additional context
+ if (!loadError.logMessage.empty())
+ QPID_LOG(warning, loadError.logMessage);
+ return loadError.error;
+}
+
+void SslCredential::ignoreHostnameVerificationFailure(){
+ hostnameVerification = false;
+}
+
+void SslCredential::loadPrivCertStore()
+{
+ // Get a handle to the system store or pkcs#12 file
+ qpid::sys::ssl::SslOptions& opts = qpid::sys::ssl::SslOptions::global;
+ if (opts.certFilename.empty()) {
+ // opening a system store, names are not case sensitive
+ std::string store = opts.certStore.empty() ? "my" : opts.certStore;
+ std::transform(store.begin(), store.end(), store.begin(), ::tolower);
+ // map confusing GUI name to actual registry store name
+ if (store == "personal")
+ store = "my";
+ certStore = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A, 0, NULL,
+ CERT_STORE_OPEN_EXISTING_FLAG | CERT_STORE_READONLY_FLAG |
+ CERT_SYSTEM_STORE_CURRENT_USER, store.c_str());
+ if (!certStore) {
+ HRESULT status = GetLastError();
+ loadError.set(Msg() << "Could not open system certificate store: " << store, status);
+ return;
+ }
+ QPID_LOG(debug, "SslConnector using certifcates from system store: " << store);
+ } else {
+ // opening the store from file and populating it with a private key
+ HANDLE certFileHandle = NULL;
+ certFileHandle = CreateFile(opts.certFilename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL, NULL);
+ if (INVALID_HANDLE_VALUE == certFileHandle) {
+ HRESULT status = GetLastError();
+ loadError.set(Msg() << "Failed to open the file holding the private key: " << opts.certFilename, status);
+ return;
+ }
+ std::vector<BYTE> certEncoded;
+ DWORD certEncodedSize = 0L;
+ const DWORD fileSize = GetFileSize(certFileHandle, NULL);
+ if (INVALID_FILE_SIZE != fileSize) {
+ certEncoded.resize(fileSize);
+ bool result = false;
+ result = ReadFile(certFileHandle, &certEncoded[0],
+ fileSize,
+ &certEncodedSize,
+ NULL);
+ if (!result) {
+ // the read failed, return the error as an HRESULT
+ HRESULT status = GetLastError();
+ CloseHandle(certFileHandle);
+ loadError.set(Msg() << "Reading the private key from file failed " << opts.certFilename, status);
+ return;
+ }
+ }
+ else {
+ HRESULT status = GetLastError();
+ loadError.set(Msg() << "Unable to read the certificate file " << opts.certFilename, status);
+ return;
+ }
+ CloseHandle(certFileHandle);
+
+ CRYPT_DATA_BLOB blobData;
+ blobData.cbData = certEncodedSize;
+ blobData.pbData = &certEncoded[0];
+
+ // get passwd from file and convert to null terminated wchar_t (Windows UCS2)
+ std::string passwd = getPasswd(opts.certPasswordFile);
+ if (loadError.pending())
+ return;
+ int pwlen = passwd.length();
+ std::vector<wchar_t> pwUCS2(pwlen + 1, L'\0');
+ int nwc = MultiByteToWideChar(CP_UTF8, MB_ERR_INVALID_CHARS, passwd.data(), pwlen, &pwUCS2[0], pwlen);
+ if (!nwc) {
+ HRESULT status = GetLastError();
+ loadError.set("Error converting password from UTF8", status);
+ return;
+ }
+
+ certStore = PFXImportCertStore(&blobData, &pwUCS2[0], 0);
+ if (certStore == NULL) {
+ HRESULT status = GetLastError();
+ loadError.set("Failed to open the certificate store", status);
+ return;
+ }
+ QPID_LOG(debug, "SslConnector using certificate from pkcs#12 file: " << opts.certFilename);
+ }
+}
+
+
+PCCERT_CONTEXT SslCredential::findCertificate(const std::string& name)
+{
+ loadPrivCertStore();
+ if (loadError.pending())
+ return NULL;
+
+ // search for the certificate by Friendly Name
+ PCCERT_CONTEXT tmpctx = NULL;
+ while (tmpctx = CertEnumCertificatesInStore(certStore, tmpctx)) {
+ DWORD len = CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE,
+ 0, NULL, NULL, 0);
+ if (len == 1)
+ continue;
+ std::vector<char> ctxname(len);
+ CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE,
+ 0, NULL, &ctxname[0], len);
+ bool found = !name.compare(&ctxname[0]);
+ if (found)
+ break;
+ }
+
+ // verify whether some certificate has been found
+ if (tmpctx == NULL) {
+ loadError.set(Msg() << "Client SSL/TLS certificate not found in the certificate store for name " << name,
+ "client certificate not found");
+ }
+ return tmpctx;
+}
+
+
+std::string SslCredential::getPasswd(const std::string& filename)
+{
+ std::string passwd;
+ if (filename == "")
+ return passwd;
+
+ HANDLE pwfHandle = CreateFile(filename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL, NULL);
+
+ if (INVALID_HANDLE_VALUE == pwfHandle) {
+ HRESULT status = GetLastError();
+ loadError.set(Msg() << "Failed to open the password file: " << filename, status);
+ return passwd;
+ }
+
+ const DWORD fileSize = GetFileSize(pwfHandle, NULL);
+ if (fileSize == INVALID_FILE_SIZE) {
+ CloseHandle(pwfHandle);
+ loadError.set("", "Cannot read password file");
+ return passwd;
+ }
+
+ std::vector<char> pwbuf;
+ pwbuf.resize(fileSize);
+ DWORD nbytes = 0;
+ if (!ReadFile(pwfHandle, &pwbuf[0], fileSize, &nbytes, NULL)) {
+ HRESULT status = GetLastError();
+ CloseHandle(pwfHandle);
+ loadError.set("Error reading password file", status);
+ return passwd;
+ }
+ CloseHandle(pwfHandle);
+
+ if (nbytes == 0)
+ return passwd;
+
+ while (nbytes) {
+ if ((pwbuf[nbytes-1] == 012) || (pwbuf[nbytes-1] == 015))
+ nbytes--;
+ else
+ break;
+ }
+
+ if (nbytes)
+ passwd.assign(&pwbuf[0], nbytes);
+
+ return passwd;
+}
+
+void SslCredential::SavedError::set(const std::string &lm, const std::string es) {
+ logMessage = lm;
+ error = es;
+}
+
+void SslCredential::SavedError::set(const std::string &lm, int status) {
+ logMessage = lm;
+ error = qpid::sys::strError(status);
+}
+
+void SslCredential::SavedError::clear() {
+ logMessage.clear();
+ error.clear();
+}
+
+bool SslCredential::SavedError::pending() {
+ return !logMessage.empty() || !error.empty();
+}
+
+}}}
diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.h b/qpid/cpp/src/qpid/sys/windows/SslCredential.h
new file mode 100644
index 0000000000..25d174a2fa
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.h
@@ -0,0 +1,84 @@
+#ifndef _sys_SslCredential
+#define _sys_SslCredential
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonImportExport.h"
+
+#include <string.h>
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * Manage certificate data structures for SChannel.
+ *
+ * Note on client certificates: The Posix/NSS implementation performs a lazy
+ * client certificate search part way through the ssl handshake if the server
+ * requests one. Here, it is not known in advance if the server will
+ * request the certificate so the certificate is pre-loaded (even if never
+ * used). To match the Linux behavior, client certificate load problems are
+ * remembered and reported later if appropriate, but do not prevent the
+ * connection attempt.
+ */
+
+class SslCredential {
+public:
+ QPID_COMMON_EXTERN SslCredential();
+ QPID_COMMON_EXTERN ~SslCredential();
+ QPID_COMMON_EXTERN bool load(const std::string& certName);
+ QPID_COMMON_EXTERN CredHandle handle();
+ QPID_COMMON_EXTERN std::string error();
+ /** Proceed with connect inspite of hostname verifcation failures*/
+ QPID_COMMON_EXTERN void ignoreHostnameVerificationFailure();
+
+private:
+ struct SavedError {
+ std::string logMessage;
+ std::string error;
+ void set(const std::string &lm, const std::string es);
+ void set(const std::string &lm, int status);
+ void clear();
+ bool pending();
+ };
+
+ HCERTSTORE certStore;
+ PCCERT_CONTEXT cert;
+ SCHANNEL_CRED cred;
+ CredHandle credHandle;
+ TimeStamp credExpiry;
+ SavedError loadError;
+ bool hostnameVerification;
+
+ PCCERT_CONTEXT findCertificate(const std::string& name);
+ void loadPrivCertStore();
+ std::string getPasswd(const std::string& filename);
+};
+
+}}}
+
+#endif // _sys_SslCredential
diff --git a/qpid/cpp/src/qpid/sys/windows/StrError.cpp b/qpid/cpp/src/qpid/sys/windows/StrError.cpp
new file mode 100755
index 0000000000..546d399d16
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/StrError.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/StrError.h"
+#include <string>
+#include <string.h>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+std::string strError(int err) {
+ const size_t bufsize = 512;
+ char buf[bufsize];
+ buf[0] = 0;
+ if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK
+ | FORMAT_MESSAGE_FROM_SYSTEM,
+ 0,
+ err,
+ 0, // Default language
+ buf,
+ bufsize,
+ 0))
+ {
+#ifdef _MSC_VER
+ strerror_s(buf, bufsize, err);
+#else
+ return std::string(strerror(err));
+#endif
+ }
+ return std::string(buf);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
new file mode 100755
index 0000000000..fb58d53b81
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* GetNativeSystemInfo call requires _WIN32_WINNT 0x0501 or higher */
+#ifndef _WIN32_WINNT
+# define _WIN32_WINNT 0x0501
+#endif
+
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+#include <assert.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <windows.h>
+#include <tlhelp32.h>
+
+#ifndef HOST_NAME_MAX
+# define HOST_NAME_MAX 256
+#endif
+
+namespace qpid {
+namespace sys {
+
+long SystemInfo::concurrency() {
+ SYSTEM_INFO sys_info;
+ ::GetSystemInfo (&sys_info);
+ long activeProcessors = 0;
+ DWORD_PTR mask = sys_info.dwActiveProcessorMask;
+ while (mask != 0) {
+ if (mask & 1)
+ ++activeProcessors;
+ mask >>= 1;
+ }
+ return activeProcessors;
+}
+
+bool SystemInfo::getLocalHostname (Address &address) {
+ char name[HOST_NAME_MAX];
+ if (::gethostname(name, sizeof(name)) != 0) {
+ errno = WSAGetLastError();
+ return false;
+ }
+ address.host = name;
+ return true;
+}
+
+static const std::string LOCALHOST("127.0.0.1");
+static const std::string TCP("tcp");
+
+// Null function which always fails to find an network interface name
+bool SystemInfo::getInterfaceAddresses(const std::string&, std::vector<std::string>&)
+{
+ return false;
+}
+
+void SystemInfo::getSystemId (std::string &osName,
+ std::string &nodeName,
+ std::string &release,
+ std::string &version,
+ std::string &machine)
+{
+ osName = "Microsoft Windows";
+
+ char node[MAX_COMPUTERNAME_LENGTH + 1];
+ DWORD nodelen = MAX_COMPUTERNAME_LENGTH + 1;
+ GetComputerName (node, &nodelen);
+ nodeName = node;
+
+ OSVERSIONINFOEX vinfo;
+ vinfo.dwOSVersionInfoSize = sizeof(vinfo);
+ GetVersionEx ((OSVERSIONINFO *)&vinfo);
+
+ SYSTEM_INFO sinfo;
+ GetNativeSystemInfo(&sinfo);
+
+ switch(vinfo.dwMajorVersion) {
+ case 5:
+ switch(vinfo.dwMinorVersion) {
+ case 0:
+ release ="2000";
+ break;
+ case 1:
+ release = "XP";
+ break;
+ case 2:
+ if (sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_AMD64 ||
+ sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_IA64)
+ release = "XP-64";
+ else
+ release = "Server 2003";
+ break;
+ default:
+ release = "Windows";
+ }
+ break;
+ case 6:
+ if (vinfo.wProductType == VER_NT_SERVER)
+ release = "Server 2008";
+ else
+ release = "Vista";
+ break;
+ default:
+ release = "Microsoft Windows";
+ }
+ version = vinfo.szCSDVersion;
+
+ switch(sinfo.wProcessorArchitecture) {
+ case PROCESSOR_ARCHITECTURE_AMD64:
+ machine = "x86-64";
+ break;
+ case PROCESSOR_ARCHITECTURE_IA64:
+ machine = "IA64";
+ break;
+ case PROCESSOR_ARCHITECTURE_INTEL:
+ machine = "x86";
+ break;
+ default:
+ machine = "unknown";
+ break;
+ }
+}
+
+uint32_t SystemInfo::getProcessId()
+{
+ return static_cast<uint32_t>(::GetCurrentProcessId());
+}
+
+uint32_t SystemInfo::getParentProcessId()
+{
+ // Only want info for the current process, so ask for something specific.
+ // The module info won't be used here but it keeps the snapshot limited to
+ // the current process so a search through all processes is not needed.
+ HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0);
+ if (snap == INVALID_HANDLE_VALUE)
+ return 0;
+ PROCESSENTRY32 entry;
+ entry.dwSize = sizeof(entry);
+ if (!Process32First(snap, &entry))
+ entry.th32ParentProcessID = 0;
+ CloseHandle(snap);
+ return static_cast<uint32_t>(entry.th32ParentProcessID);
+}
+
+std::string SystemInfo::getProcessName()
+{
+ std::string name;
+
+ // Only want info for the current process, so ask for something specific.
+ // The module info won't be used here but it keeps the snapshot limited to
+ // the current process so a search through all processes is not needed.
+ HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0);
+ if (snap == INVALID_HANDLE_VALUE)
+ return name;
+ PROCESSENTRY32 entry;
+ entry.dwSize = sizeof(entry);
+ if (!Process32First(snap, &entry))
+ entry.szExeFile[0] = '\0';
+ CloseHandle(snap);
+ name = entry.szExeFile;
+ return name;
+}
+
+
+#ifdef _DLL
+namespace windows {
+// set from one or more Qpid DLLs: i.e. in DllMain with DLL_PROCESS_DETACH
+QPID_EXPORT bool processExiting = false;
+QPID_EXPORT bool libraryUnloading = false;
+}
+#endif
+
+bool SystemInfo::threadSafeShutdown()
+{
+#ifdef _DLL
+ if (!windows::processExiting && !windows::libraryUnloading) {
+ // called before exit() or FreeLibrary(), or by a DLL without
+ // a participating DllMain.
+ QPID_LOG(warning, "invalid query for shutdown state");
+ throw qpid::Exception(QPID_MSG("Unable to determine shutdown state."));
+ }
+ return !windows::processExiting;
+#else
+ // Not a DLL: shutdown can only be by exit() or return from main().
+ return false;
+#endif
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/Thread.cpp b/qpid/cpp/src/qpid/sys/windows/Thread.cpp
new file mode 100755
index 0000000000..8034680664
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Thread.cpp
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Ensure definition of OpenThread in mingw
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/windows/check.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <process.h>
+#include <windows.h>
+
+/*
+ * This implementation distinguishes between two types of thread: Qpid
+ * threads (based on qpid::sys::Runnable) and the rest. It provides a
+ * join() that will not deadlock against the Windows loader lock for
+ * Qpid threads.
+ *
+ * System thread identifiers are unique per Windows thread; thread
+ * handles are not. Thread identifiers can be recycled, but keeping a
+ * handle open against the thread prevents recycling as long as
+ * shared_ptr references to a ThreadPrivate structure remain.
+ *
+ * There is a 1-1 relationship between Qpid threads and their
+ * ThreadPrivate structure. Non-Qpid threads do not need to find the
+ * qpidThreadDone handle, so there may be a 1-many relationship for
+ * them.
+ *
+ * TLS storage is used for a lockless solution for static library
+ * builds. The special case of LoadLibrary/FreeLibrary requires
+ * additional synchronization variables and resource cleanup in
+ * DllMain. _DLL marks the dynamic case.
+ */
+
+namespace qpid {
+namespace sys {
+
+class ThreadPrivate {
+public:
+ friend class Thread;
+ friend unsigned __stdcall runThreadPrivate(void*);
+ typedef boost::shared_ptr<ThreadPrivate> shared_ptr;
+ ~ThreadPrivate();
+
+private:
+ unsigned threadId;
+ HANDLE threadHandle;
+ HANDLE initCompleted;
+ HANDLE qpidThreadDone;
+ Runnable* runnable;
+ shared_ptr keepAlive;
+
+ ThreadPrivate() : threadId(GetCurrentThreadId()), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(NULL) {
+ threadHandle = OpenThread (SYNCHRONIZE, FALSE, threadId);
+ QPID_WINDOWS_CHECK_CRT_NZ(threadHandle);
+ }
+
+ ThreadPrivate(Runnable* r) : threadHandle(NULL), initCompleted(NULL),
+ qpidThreadDone(NULL), runnable(r) {}
+
+ void start(shared_ptr& p);
+ static shared_ptr createThread(Runnable* r);
+};
+
+}} // namespace qpid::sys
+
+
+namespace {
+using namespace qpid::sys;
+
+#ifdef _DLL
+class ScopedCriticalSection
+{
+ public:
+ ScopedCriticalSection(CRITICAL_SECTION& cs) : criticalSection(cs) { EnterCriticalSection(&criticalSection); }
+ ~ScopedCriticalSection() { LeaveCriticalSection(&criticalSection); }
+ private:
+ CRITICAL_SECTION& criticalSection;
+};
+
+CRITICAL_SECTION threadLock;
+long runningThreads = 0;
+HANDLE threadsDone;
+bool terminating = false;
+#endif
+
+
+DWORD volatile tlsIndex = TLS_OUT_OF_INDEXES;
+
+DWORD getTlsIndex() {
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ return tlsIndex; // already set
+
+ DWORD trialIndex = TlsAlloc();
+ QPID_WINDOWS_CHECK_NOT(trialIndex, TLS_OUT_OF_INDEXES); // No OS resource
+
+ // only one thread gets to set the value
+ DWORD actualIndex = (DWORD) InterlockedCompareExchange((LONG volatile *) &tlsIndex, (LONG) trialIndex, (LONG) TLS_OUT_OF_INDEXES);
+ if (actualIndex == TLS_OUT_OF_INDEXES)
+ return trialIndex; // we won the race
+ else {
+ TlsFree(trialIndex);
+ return actualIndex;
+ }
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+unsigned __stdcall runThreadPrivate(void* p)
+{
+ ThreadPrivate* threadPrivate = static_cast<ThreadPrivate*>(p);
+ TlsSetValue(getTlsIndex(), threadPrivate);
+
+ WaitForSingleObject (threadPrivate->initCompleted, INFINITE);
+ CloseHandle (threadPrivate->initCompleted);
+ threadPrivate->initCompleted = NULL;
+
+ try {
+ threadPrivate->runnable->run();
+ } catch (...) {
+ // not our concern
+ }
+
+ SetEvent (threadPrivate->qpidThreadDone); // allow join()
+ threadPrivate->keepAlive.reset(); // may run ThreadPrivate destructor
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+ return 0;
+}
+
+
+ThreadPrivate::shared_ptr ThreadPrivate::createThread(Runnable* runnable) {
+ ThreadPrivate::shared_ptr tp(new ThreadPrivate(runnable));
+ tp->start(tp);
+ return tp;
+}
+
+void ThreadPrivate::start(ThreadPrivate::shared_ptr& tp) {
+ getTlsIndex(); // fail here if OS problem, not in new thread
+
+ initCompleted = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(initCompleted);
+ qpidThreadDone = CreateEvent (NULL, TRUE, FALSE, NULL);
+ QPID_WINDOWS_CHECK_CRT_NZ(qpidThreadDone);
+
+#ifdef _DLL
+ {
+ ScopedCriticalSection l(threadLock);
+ if (terminating)
+ throw qpid::Exception(QPID_MSG("creating thread after exit/FreeLibrary"));
+ runningThreads++;
+ }
+#endif
+
+ uintptr_t h = _beginthreadex(0,
+ 0,
+ runThreadPrivate,
+ (void *)this,
+ 0,
+ &threadId);
+
+#ifdef _DLL
+ if (h == NULL) {
+ ScopedCriticalSection l(threadLock);
+ if (--runningThreads == 0)
+ SetEvent(threadsDone);
+ }
+#endif
+
+ QPID_WINDOWS_CHECK_CRT_NZ(h);
+
+ // Success
+ keepAlive = tp;
+ threadHandle = reinterpret_cast<HANDLE>(h);
+ SetEvent (initCompleted);
+}
+
+ThreadPrivate::~ThreadPrivate() {
+ if (threadHandle)
+ CloseHandle (threadHandle);
+ if (initCompleted)
+ CloseHandle (initCompleted);
+ if (qpidThreadDone)
+ CloseHandle (qpidThreadDone);
+}
+
+
+Thread::Thread() {}
+
+Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) {}
+
+Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
+
+Thread::operator bool() {
+ return !!impl;
+}
+
+bool Thread::operator==(const Thread& t) const {
+ if (!impl || !t.impl)
+ return false;
+ return impl->threadId == t.impl->threadId;
+}
+
+bool Thread::operator!=(const Thread& t) const {
+ return !(*this==t);
+}
+
+void Thread::join() {
+ if (impl) {
+ DWORD status;
+ if (impl->runnable) {
+ HANDLE handles[2] = {impl->qpidThreadDone, impl->threadHandle};
+ // wait for either. threadHandle not signalled if loader
+ // lock held (FreeLibrary). qpidThreadDone not signalled
+ // if thread terminated by exit().
+ status = WaitForMultipleObjects (2, handles, false, INFINITE);
+ }
+ else
+ status = WaitForSingleObject (impl->threadHandle, INFINITE);
+ QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
+ }
+}
+
+unsigned long Thread::logId() {
+ return GetCurrentThreadId();
+}
+
+/* static */
+Thread Thread::current() {
+ ThreadPrivate* tlsValue = (ThreadPrivate *) TlsGetValue(getTlsIndex());
+ Thread t;
+ if (tlsValue != NULL) {
+ // called from within Runnable->run(), so keepAlive has positive use count
+ t.impl = tlsValue->keepAlive;
+ }
+ else
+ t.impl.reset(new ThreadPrivate());
+ return t;
+}
+
+}} // namespace qpid::sys
+
+
+#ifdef _DLL
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+extern bool processExiting;
+extern bool libraryUnloading;
+
+}}} // namespace qpid::sys::SystemInfo
+
+// DllMain: called possibly many times in a process lifetime if dll
+// loaded and freed repeatedly. Be mindful of Windows loader lock
+// and other DllMain restrictions.
+
+BOOL APIENTRY DllMain(HMODULE hm, DWORD reason, LPVOID reserved) {
+ switch (reason) {
+ case DLL_PROCESS_ATTACH:
+ InitializeCriticalSection(&threadLock);
+ threadsDone = CreateEvent(NULL, TRUE, FALSE, NULL);
+ break;
+
+ case DLL_PROCESS_DETACH:
+ terminating = true;
+ if (reserved != NULL) {
+ // process exit(): threads are stopped arbitrarily and
+ // possibly in an inconsistent state. Not even threadLock
+ // can be trusted. All static destructors for this unit
+ // are pending and face the same unsafe environment.
+ // Any resources this unit knows about will be released as
+ // part of process tear down by the OS. Accordingly, skip
+ // any clean up tasks.
+ qpid::sys::windows::processExiting = true;
+ return TRUE;
+ }
+ else {
+ // FreeLibrary(): threads are still running and we are
+ // encouraged to clean up to avoid leaks. Mostly we just
+ // want any straggler threads to finish and notify
+ // threadsDone as the last thing they do.
+ qpid::sys::windows::libraryUnloading = true;
+ while (1) {
+ {
+ ScopedCriticalSection l(threadLock);
+ if (runningThreads == 0)
+ break;
+ ResetEvent(threadsDone);
+ }
+ WaitForSingleObject(threadsDone, INFINITE);
+ }
+ if (tlsIndex != TLS_OUT_OF_INDEXES)
+ TlsFree(getTlsIndex());
+ CloseHandle(threadsDone);
+ DeleteCriticalSection(&threadLock);
+ }
+ break;
+
+ case DLL_THREAD_ATTACH:
+ case DLL_THREAD_DETACH:
+ break;
+ }
+ return TRUE;
+}
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/windows/Time.cpp b/qpid/cpp/src/qpid/sys/windows/Time.cpp
new file mode 100644
index 0000000000..4169ef1f0a
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Time.cpp
@@ -0,0 +1,217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include <cmath>
+#include <ostream>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <windows.h>
+#include <time.h>
+
+using namespace boost::posix_time;
+
+namespace {
+
+// High-res timing support. This will display times since program start,
+// more or less. Keep track of the start value and the conversion factor to
+// seconds.
+bool timeInitialized = false;
+LARGE_INTEGER start_hpc;
+double hpc_freq = 1.0;
+
+double start_time;
+
+/// Static constant to remove time skew between FILETIME and POSIX
+/// time. POSIX and Win32 use different epochs (Jan. 1, 1970 v.s.
+/// Jan. 1, 1601). The following constant defines the difference
+/// in 100ns ticks.
+const DWORDLONG FILETIME_to_timval_skew = 0x19db1ded53e8000;
+
+}
+
+namespace qpid {
+namespace sys {
+
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) {
+ if (d == Duration::max()) {
+ timepoint = ptime(max_date_time);
+ }
+ else {
+ time_duration td = microseconds(d.nanosecs / 1000);
+ timepoint = t.timepoint + td;
+ }
+}
+
+AbsTime AbsTime::FarFuture() {
+ AbsTime ff;
+ ptime maxd(max_date_time);
+ ff.timepoint = maxd;
+ return ff;
+}
+
+AbsTime AbsTime::Zero() {
+ AbsTime time_epoch;
+ time_epoch.timepoint = boost::posix_time::from_time_t(0);
+ return time_epoch;
+}
+
+AbsTime AbsTime::epoch() {
+ AbsTime time_epoch;
+ time_epoch.timepoint = boost::posix_time::from_time_t(0);
+ return time_epoch;
+}
+
+AbsTime AbsTime::now() {
+ AbsTime time_now;
+ time_now.timepoint = boost::get_system_time();
+ return time_now;
+}
+
+Duration Duration::FromEpoch() {
+ time_duration d = boost::get_system_time() - boost::posix_time::from_time_t(0);
+ return d.total_nanoseconds();
+}
+
+Duration::Duration(const AbsTime& start, const AbsTime& finish) {
+ time_duration d = finish.timepoint - start.timepoint;
+ nanosecs = d.total_nanoseconds();
+}
+
+std::ostream& operator<<(std::ostream& o, const Duration& d) {
+ if (d >= TIME_SEC) return o << (double(d)/TIME_SEC) << "s";
+ if (d >= TIME_MSEC) return o << (double(d)/TIME_MSEC) << "ms";
+ if (d >= TIME_USEC) return o << (double(d)/TIME_USEC) << "us";
+ return o << int64_t(d) << "ns";
+}
+
+std::istream& operator>>(std::istream& i, Duration& d) {
+ // Don't throw, let the istream throw if it's configured to do so.
+ double number;
+ i >> number;
+ if (i.fail()) return i;
+
+ if (i.eof() || std::isspace(i.peek())) // No suffix
+ d = number*TIME_SEC;
+ else {
+ std::string suffix;
+ i >> suffix;
+ if (i.fail()) return i;
+ if (suffix.compare("s") == 0) d = number*TIME_SEC;
+ else if (suffix.compare("ms") == 0) d = number*TIME_MSEC;
+ else if (suffix.compare("us") == 0) d = number*TIME_USEC;
+ else if (suffix.compare("ns") == 0) d = number*TIME_NSEC;
+ else i.setstate(std::ios::failbit);
+ }
+ return i;
+}
+
+std::ostream& operator<<(std::ostream& o, const AbsTime& t) {
+ std::string time_string = to_simple_string(t.timepoint);
+ return o << time_string;
+}
+
+
+void sleep(int secs) {
+ ::Sleep(secs * 1000);
+}
+
+void usleep(uint64_t usecs) {
+ DWORD msecs = usecs / 1000;
+ if (msecs == 0)
+ msecs = 1;
+ ::Sleep(msecs);
+}
+
+void outputFormattedNow(std::ostream& o) {
+ ::time_t rawtime;
+ ::tm timeinfo;
+ char time_string[100];
+
+ ::time( &rawtime );
+#ifdef _MSC_VER
+ ::localtime_s(&timeinfo, &rawtime);
+#else
+ timeinfo = *(::localtime(&rawtime));
+#endif
+ ::strftime(time_string, 100,
+ "%Y-%m-%d %H:%M:%S",
+ &timeinfo);
+ o << time_string << " ";
+}
+
+void outputHiresNow(std::ostream& o) {
+ ::time_t tv_sec;
+ ::tm timeinfo;
+ char time_string[100];
+
+ if (!timeInitialized) {
+ // To start, get the current time from FILETIME which includes
+ // sub-second resolution. However, since FILETIME is updated a bit
+ // "bumpy" every 15 msec or so, future time displays will be the
+ // starting FILETIME plus a delta based on the high-resolution
+ // performance counter.
+ FILETIME file_time;
+ ULARGE_INTEGER start_usec;
+ ::GetSystemTimeAsFileTime(&file_time); // This is in 100ns units
+ start_usec.LowPart = file_time.dwLowDateTime;
+ start_usec.HighPart = file_time.dwHighDateTime;
+ start_usec.QuadPart -= FILETIME_to_timval_skew;
+ start_usec.QuadPart /= 10; // Convert 100ns to usec
+ tv_sec = (time_t)(start_usec.QuadPart / (1000 * 1000));
+ long tv_usec = (long)(start_usec.QuadPart % (1000 * 1000));
+ start_time = static_cast<double>(tv_sec);
+ start_time += tv_usec / 1000000.0;
+
+ start_hpc.QuadPart = 0;
+ LARGE_INTEGER iFreq;
+ iFreq.QuadPart = 1;
+ QueryPerformanceCounter(&start_hpc);
+ QueryPerformanceFrequency(&iFreq);
+ hpc_freq = static_cast<double>(iFreq.QuadPart);
+ timeInitialized = true;
+ }
+ LARGE_INTEGER hpc_now;
+ hpc_now.QuadPart = 0;
+ QueryPerformanceCounter(&hpc_now);
+ hpc_now.QuadPart -= start_hpc.QuadPart;
+ if (hpc_now.QuadPart < 0)
+ hpc_now.QuadPart = 0;
+ double now = static_cast<double>(hpc_now.QuadPart);
+ now /= hpc_freq; // now is seconds after this
+ double fnow = start_time + now;
+ double usec, sec;
+ usec = modf(fnow, &sec);
+ tv_sec = static_cast<time_t>(sec);
+#ifdef _MSC_VER
+ ::localtime_s(&timeinfo, &tv_sec);
+#else
+ timeinfo = *(::localtime(&tv_sec));
+#endif
+ ::strftime(time_string, 100,
+ "%Y-%m-%d %H:%M:%S",
+ &timeinfo);
+ // No way to set "max field width" to cleanly output the double usec so
+ // convert it back to integral number of usecs and print that.
+ unsigned long i_usec = usec * 1000 * 1000;
+ o << time_string << "." << std::setw(6) << std::setfill('0') << i_usec << " ";
+}
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/Time.h b/qpid/cpp/src/qpid/sys/windows/Time.h
new file mode 100644
index 0000000000..2987b1c8b2
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Time.h
@@ -0,0 +1,36 @@
+#ifndef QPID_SYS_WINDOWS_TIME_H
+#define QPID_SYS_WINDOWS_TIME_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Class to represent an instant in time. Boost has this stuff already done
+ * so just reuse it. We can also grab this for quick use with the Condition
+ * wait operations.
+ */
+typedef boost::posix_time::ptime TimePrivate;
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_WINDOWS_TIME_H*/
diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp
new file mode 100644
index 0000000000..5637f6a9fb
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp
@@ -0,0 +1,276 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/windows/WinSocket.h"
+
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include "qpid/sys/SystemInfo.h"
+
+namespace qpid {
+namespace sys {
+
+// Need to initialize WinSock. Ideally, this would be a singleton or embedded
+// in some one-time initialization function. I tried boost singleton and could
+// not get it to compile (and others located in google had the same problem).
+// So, this simple static with an interlocked increment will do for known
+// use cases at this time. Since this will only shut down winsock at process
+// termination, there may be some problems with client programs that also
+// expect to load and unload winsock, but we'll see...
+// If someone does get an easy-to-use singleton sometime, converting to it
+// may be preferable.
+
+namespace {
+
+static LONG volatile initialized = 0;
+
+class WinSockSetup {
+ // : public boost::details::pool::singleton_default<WinSockSetup> {
+
+public:
+ WinSockSetup() {
+ LONG timesEntered = InterlockedIncrement(&initialized);
+ if (timesEntered > 1)
+ return;
+ err = 0;
+ WORD wVersionRequested;
+ WSADATA wsaData;
+
+ /* Request WinSock 2.2 */
+ wVersionRequested = MAKEWORD(2, 2);
+ err = WSAStartup(wVersionRequested, &wsaData);
+ }
+
+ ~WinSockSetup() {
+ if (SystemInfo::threadSafeShutdown())
+ WSACleanup();
+ }
+
+public:
+ int error(void) const { return err; }
+
+protected:
+ DWORD err;
+};
+
+static WinSockSetup setup;
+
+std::string getName(SOCKET fd, bool local)
+{
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ if (local) {
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
+ } else {
+ QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen));
+ }
+
+ return SocketAddress::asString(name, namelen);
+}
+
+uint16_t getLocalPort(int fd)
+{
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
+
+ return SocketAddress::getPort(name);
+}
+} // namespace
+
+WinSocket::WinSocket() :
+ handle(new IOHandle),
+ nonblocking(false),
+ nodelay(false)
+{}
+
+Socket* createSocket()
+{
+ return new WinSocket;
+}
+
+WinSocket::WinSocket(SOCKET fd) :
+ handle(new IOHandle(fd)),
+ nonblocking(false),
+ nodelay(false)
+{}
+
+WinSocket::operator const IOHandle&() const
+{
+ return *handle;
+}
+
+void WinSocket::createSocket(const SocketAddress& sa) const
+{
+ SOCKET& socket = handle->fd;
+ if (socket != INVALID_SOCKET) WinSocket::close();
+
+ SOCKET s = ::socket (getAddrInfo(sa).ai_family,
+ getAddrInfo(sa).ai_socktype,
+ 0);
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ if (nodelay) setTcpNoDelay();
+ } catch (std::exception&) {
+ ::closesocket(s);
+ socket = INVALID_SOCKET;
+ throw;
+ }
+}
+
+void WinSocket::setNonblocking() const {
+ u_long nonblock = 1;
+ QPID_WINSOCK_CHECK(ioctlsocket(handle->fd, FIONBIO, &nonblock));
+}
+
+void
+WinSocket::connect(const SocketAddress& addr) const
+{
+ peername = addr.asString(false);
+
+ createSocket(addr);
+
+ const SOCKET& socket = handle->fd;
+ int err;
+ WSASetLastError(0);
+ if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
+ ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK))
+ throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername));
+}
+
+void
+WinSocket::finishConnect(const SocketAddress&) const
+{
+}
+
+void
+WinSocket::close() const
+{
+ SOCKET& socket = handle->fd;
+ if (socket == INVALID_SOCKET) return;
+ QPID_WINSOCK_CHECK(closesocket(socket));
+ socket = INVALID_SOCKET;
+}
+
+
+int WinSocket::write(const void *buf, size_t count) const
+{
+ const SOCKET& socket = handle->fd;
+ int sent = ::send(socket, (const char *)buf, count, 0);
+ if (sent == SOCKET_ERROR)
+ return -1;
+ return sent;
+}
+
+int WinSocket::read(void *buf, size_t count) const
+{
+ const SOCKET& socket = handle->fd;
+ int received = ::recv(socket, (char *)buf, count, 0);
+ if (received == SOCKET_ERROR)
+ return -1;
+ return received;
+}
+
+int WinSocket::listen(const SocketAddress& addr, int backlog) const
+{
+ createSocket(addr);
+
+ const SOCKET& socket = handle->fd;
+ BOOL yes=1;
+ QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (char *)&yes, sizeof(yes)));
+
+ if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
+ if (::listen(socket, backlog) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+
+ return getLocalPort(socket);
+}
+
+Socket* WinSocket::accept() const
+{
+ SOCKET afd = ::accept(handle->fd, 0, 0);
+ if (afd != INVALID_SOCKET)
+ return new WinSocket(afd);
+ else if (WSAGetLastError() == EAGAIN)
+ return 0;
+ else throw QPID_WINDOWS_ERROR(WSAGetLastError());
+}
+
+std::string WinSocket::getPeerAddress() const
+{
+ if (peername.empty()) {
+ peername = getName(handle->fd, false);
+ }
+ return peername;
+}
+
+std::string WinSocket::getLocalAddress() const
+{
+ if (localname.empty()) {
+ localname = getName(handle->fd, true);
+ }
+ return localname;
+}
+
+int WinSocket::getError() const
+{
+ int result;
+ socklen_t rSize = sizeof (result);
+
+ QPID_WINSOCK_CHECK(::getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
+ return result;
+}
+
+void WinSocket::setTcpNoDelay() const
+{
+ SOCKET& socket = handle->fd;
+ nodelay = true;
+ if (socket != INVALID_SOCKET) {
+ int flag = 1;
+ int result = setsockopt(handle->fd,
+ IPPROTO_TCP,
+ TCP_NODELAY,
+ (char *)&flag,
+ sizeof(flag));
+ QPID_WINSOCK_CHECK(result);
+ }
+}
+
+int WinSocket::getKeyLen() const
+{
+ return 0;
+}
+
+std::string WinSocket::getClientAuthId() const
+{
+ return std::string();
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/WinSocket.h b/qpid/cpp/src/qpid/sys/windows/WinSocket.h
new file mode 100644
index 0000000000..bee6a58e7a
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/WinSocket.h
@@ -0,0 +1,118 @@
+#ifndef QPID_SYS_WINDOWS_BSDSOCKET_H
+#define QPID_SYS_WINDOWS_BSDSOCKET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+#include <boost/scoped_ptr.hpp>
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+namespace windows {
+Socket* createSameTypeSocket(const Socket&);
+}
+
+class Duration;
+class IOHandle;
+class SocketAddress;
+
+class QPID_COMMON_CLASS_EXTERN WinSocket : public Socket
+{
+public:
+ /** Create a socket wrapper for descriptor. */
+ QPID_COMMON_EXTERN WinSocket();
+
+ QPID_COMMON_EXTERN operator const IOHandle&() const;
+
+ /** Set socket non blocking */
+ QPID_COMMON_EXTERN virtual void setNonblocking() const;
+
+ QPID_COMMON_EXTERN virtual void setTcpNoDelay() const;
+
+ QPID_COMMON_EXTERN virtual void connect(const SocketAddress&) const;
+ QPID_COMMON_EXTERN virtual void finishConnect(const SocketAddress&) const;
+
+ QPID_COMMON_EXTERN virtual void close() const;
+
+ /** Bind to a port and start listening.
+ *@return The bound port number
+ */
+ QPID_COMMON_EXTERN virtual int listen(const SocketAddress&, int backlog = 10) const;
+
+ /**
+ * Returns an address (host and port) for the remote end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getPeerAddress() const;
+ /**
+ * Returns an address (host and port) for the local end of the
+ * socket
+ */
+ QPID_COMMON_EXTERN std::string getLocalAddress() const;
+
+ /**
+ * Returns the error code stored in the socket. This may be used
+ * to determine the result of a non-blocking connect.
+ */
+ QPID_COMMON_EXTERN int getError() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ QPID_COMMON_EXTERN virtual Socket* accept() const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ QPID_COMMON_EXTERN virtual int read(void *buf, size_t count) const;
+ QPID_COMMON_EXTERN virtual int write(const void *buf, size_t count) const;
+
+ QPID_COMMON_EXTERN int getKeyLen() const;
+ QPID_COMMON_EXTERN std::string getClientAuthId() const;
+
+protected:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
+ mutable boost::scoped_ptr<IOHandle> handle;
+ mutable std::string localname;
+ mutable std::string peername;
+ mutable bool nonblocking;
+ mutable bool nodelay;
+
+ /** Construct socket with existing handle */
+ friend Socket* qpid::sys::windows::createSameTypeSocket(const Socket&);
+ WinSocket(SOCKET fd);
+};
+
+}}
+#endif /*!QPID_SYS_WINDOWS_BSDSOCKET_H*/
diff --git a/qpid/cpp/src/qpid/sys/windows/check.h b/qpid/cpp/src/qpid/sys/windows/check.h
new file mode 100755
index 0000000000..2a8e439bed
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/check.h
@@ -0,0 +1,49 @@
+#ifndef _windows_check_h
+#define _windows_check_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include "qpid/sys/StrError.h"
+
+#define QPID_WINDOWS_ERROR(ERRVAL) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRVAL)))
+#define QPID_WINDOWS_CRT_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRNO)))
+
+/** THROW QPID_WINDOWS_ERROR(::GetLastError()) if RESULT is NULL */
+#define QPID_WINDOWS_CHECK_NULL(RESULT) \
+ if ((RESULT) == NULL) throw QPID_WINDOWS_ERROR((::GetLastError()))
+
+#define QPID_WINDOWS_CHECK_NOT(RESULT,VAL) \
+ if ((RESULT) == (VAL)) throw QPID_WINDOWS_ERROR((::GetLastError()))
+
+#define QPID_WINDOWS_CHECK_ASYNC_START(STATUS) \
+ if (!(STATUS) && ::WSAGetLastError() != ERROR_IO_PENDING) \
+ throw QPID_WINDOWS_ERROR((::WSAGetLastError()))
+
+#define QPID_WINDOWS_CHECK_CRT_NZ(VAL) \
+ if ((VAL) == 0) throw QPID_WINDOWS_CRT_ERROR(errno)
+
+#define QPID_WINSOCK_CHECK(OP) \
+ if ((OP) == SOCKET_ERROR) throw QPID_WINDOWS_ERROR((::WSAGetLastError()))
+
+#endif /*!_windows_check_h*/
diff --git a/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h
new file mode 100644
index 0000000000..51f613cc25
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h
@@ -0,0 +1,39 @@
+#ifndef _sys_windows_mingw32_compat
+#define _sys_windows_mingw32_compat
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef WIN32
+#ifndef _MSC_VER
+
+//
+// The following definitions for extension function GUIDs and signatures are taken from
+// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of
+// mswsock.h, but are not included presently.
+//
+
+#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
+typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED);
+
+#endif
+#endif
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/windows/util.cpp b/qpid/cpp/src/qpid/sys/windows/util.cpp
new file mode 100644
index 0000000000..75aef26c35
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/util.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/windows/util.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <iostream>
+#include <fstream>
+
+namespace qpid {
+namespace sys {
+namespace ssl {
+
+static const std::string LOCALHOST("127.0.0.1");
+
+std::string defaultCertName()
+{
+ Address address;
+ if (SystemInfo::getLocalHostname(address)) {
+ return address.host;
+ } else {
+ return LOCALHOST;
+ }
+}
+
+SslOptions::SslOptions() : qpid::Options("SSL Settings"),
+ certName(defaultCertName())
+{
+ addOptions()
+ ("ssl-cert-password-file", optValue(certPasswordFile, "PATH"), "File containing password to use for accessing certificates")
+ ("ssl-cert-store", optValue(certStore, "NAME"), "Windows certificate store containing the certificate")
+ ("ssl-cert-Filename", optValue(certFilename, "PATH"), "Path to PKCS#12 file containing the certificate")
+ ("ssl-cert-name", optValue(certName, "NAME"), "Friendly Name of the certificate to use");
+}
+
+SslOptions& SslOptions::operator=(const SslOptions& o)
+{
+ certStore = o.certStore;
+ certName = o.certName;
+ certPasswordFile = o.certPasswordFile;
+ certFilename = o.certFilename;
+
+ return *this;
+}
+
+SslOptions SslOptions::global;
+
+void initWinSsl(const SslOptions& options, bool)
+{
+ SslOptions::global = options;
+}
+}}} // namespace qpid::sys::ssl
diff --git a/qpid/cpp/src/qpid/sys/windows/util.h b/qpid/cpp/src/qpid/sys/windows/util.h
new file mode 100644
index 0000000000..2855a90955
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/util.h
@@ -0,0 +1,50 @@
+#ifndef QPID_SYS_SSL_UTIL_H
+#define QPID_SYS_SSL_UTIL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonImportExport.h"
+#include "qpid/Options.h"
+#include <string>
+
+namespace qpid {
+namespace sys {
+namespace ssl {
+
+struct SslOptions : qpid::Options
+{
+ QPID_COMMON_EXTERN static SslOptions global;
+
+ std::string certStore;
+ std::string certName;
+ std::string certPasswordFile;
+ std::string certFilename;
+
+ QPID_COMMON_EXTERN SslOptions();
+ QPID_COMMON_EXTERN SslOptions& operator=(const SslOptions&);
+};
+
+QPID_COMMON_EXTERN void initWinSsl(const SslOptions& options, bool server = false);
+
+}}} // namespace qpid::sys::ssl
+
+#endif /*!QPID_SYS_SSL_UTIL_H*/
diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.cpp b/qpid/cpp/src/qpid/sys/windows/uuid.cpp
new file mode 100644
index 0000000000..4ff75ca627
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/uuid.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * UUIDs and GUIDs (both RFC 4122) differ on byte positions of the
+ * internal representation. This matters when encoding to the wire
+ * and adhering to versioning info. Microsoft APIs used here operate
+ * on GUIDs even if the name implies UUIDs. AMQP expects the UUID 128
+ * bit format which is used here unless otherwise noted.
+ */
+
+#include <rpc.h>
+#ifdef uuid_t /* Done in rpcdce.h */
+# undef uuid_t
+#endif
+
+#include "qpid/sys/uuid.h"
+
+#include <string.h>
+
+namespace {
+inline void iswap (char *p1, char *p2) {
+ char t = *p1;
+ *p1 = *p2;
+ *p2 = t;
+}
+
+void toUuid (const UUID *guid, uint8_t uuid[qpid::sys::UuidSize]) {
+ // copy then swap bytes
+ memcpy ((char *) uuid, (char *) guid, qpid::sys::UuidSize);
+ char *p = (char *) uuid;
+ iswap (p, p+3);
+ iswap (p+1, p+2);
+ iswap (p+4, p+5);
+ iswap (p+6, p+7);
+}
+
+} // namespace
+
+namespace qpid {
+namespace sys {
+
+void uuid_generate (uint8_t out[qpid::sys::UuidSize]) {
+ UUID guid;
+ UuidCreate (&guid);
+ // Version 4 GUID, convert to UUID
+ toUuid (&guid, out);
+}
+
+}}