summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/windows
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/windows')
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp755
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/AsynchIoResult.h204
-rw-r--r--qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp53
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IOHandle.cpp42
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h61
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/IocpPoller.cpp219
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/LockFile.cpp64
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/PipeHandle.cpp101
-rw-r--r--qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp114
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Shlib.cpp54
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Socket.cpp289
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp76
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp661
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h191
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/StrError.cpp52
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/SystemInfo.cpp203
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Thread.cpp100
-rw-r--r--qpid/cpp/src/qpid/sys/windows/Time.cpp136
-rw-r--r--qpid/cpp/src/qpid/sys/windows/mingw32_compat.h39
-rw-r--r--qpid/cpp/src/qpid/sys/windows/uuid.cpp67
-rw-r--r--qpid/cpp/src/qpid/sys/windows/uuid.h39
21 files changed, 3520 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
new file mode 100644
index 0000000000..8d84fdb7b2
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -0,0 +1,755 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
+
+#include <boost/thread/once.hpp>
+
+#include <queue>
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+#include <boost/bind.hpp>
+
+namespace {
+
+ typedef qpid::sys::ScopedLock<qpid::sys::Mutex> QLock;
+
+/*
+ * The function pointers for AcceptEx and ConnectEx need to be looked up
+ * at run time. Make sure this is done only once.
+ */
+boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
+LPFN_ACCEPTEX fnAcceptEx = 0;
+typedef void (*lookUpFunc)(const qpid::sys::Socket &);
+
+void lookUpAcceptEx() {
+ SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ GUID guidAcceptEx = WSAID_ACCEPTEX;
+ DWORD dwBytes = 0;
+ WSAIoctl(h,
+ SIO_GET_EXTENSION_FUNCTION_POINTER,
+ &guidAcceptEx,
+ sizeof(guidAcceptEx),
+ &fnAcceptEx,
+ sizeof(fnAcceptEx),
+ &dwBytes,
+ NULL,
+ NULL);
+ closesocket(h);
+ if (fnAcceptEx == 0)
+ throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+}
+
+}
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * Asynch Acceptor
+ *
+ */
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
+
+ friend class AsynchAcceptResult;
+
+public:
+ AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptor();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void restart(void);
+
+ AsynchAcceptor::Callback acceptedCallback;
+ const Socket& socket;
+};
+
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
+ : acceptedCallback(callback),
+ socket(s) {
+
+ s.setNonblocking();
+#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
+ boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
+#else
+ boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
+#endif
+}
+
+AsynchAcceptor::~AsynchAcceptor()
+{
+ socket.close();
+}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
+ restart ();
+}
+
+void AsynchAcceptor::restart(void) {
+ DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
+ AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
+ this,
+ toSocketHandle(socket));
+ BOOL status;
+ status = ::fnAcceptEx(toSocketHandle(socket),
+ toSocketHandle(*result->newSocket),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
+ QPID_WINDOWS_CHECK_ASYNC_START(status);
+}
+
+
+AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
+ SOCKET listener)
+ : callback(cb), acceptor(acceptor), listener(listener) {
+ newSocket.reset (new Socket());
+}
+
+void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
+ ::setsockopt (toSocketHandle(*newSocket),
+ SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&listener,
+ sizeof (listener));
+ callback(*(newSocket.release()));
+ acceptor->restart ();
+ delete this;
+}
+
+void AsynchAcceptResult::failure(int /*status*/) {
+ //if (status != WSA_OPERATION_ABORTED)
+ // Can there be anything else? ;
+ delete this;
+}
+
+/*
+ * AsynchConnector does synchronous connects for now... to do asynch the
+ * IocpPoller will need some extension to register an event handle as a
+ * CONNECT-type "direction", the connect completion/result will need an
+ * event handle to associate with the connecting handle. But there's no
+ * time for that right now...
+ */
+class AsynchConnector : public qpid::sys::AsynchConnector {
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+ const std::string hostname;
+ const std::string port;
+
+public:
+ AsynchConnector(const Socket& socket,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+ void start(Poller::shared_ptr poller);
+};
+
+AsynchConnector::AsynchConnector(const Socket& sock,
+ const std::string& hname,
+ const std::string& p,
+ ConnectedCallback connCb,
+ FailedCallback failCb) :
+ connCallback(connCb), failCallback(failCb), socket(sock),
+ hostname(hname), port(p)
+{
+}
+
+void AsynchConnector::start(Poller::shared_ptr)
+{
+ try {
+ socket.connect(hostname, port);
+ socket.setNonblocking();
+ connCallback(socket);
+ } catch(std::exception& e) {
+ if (failCallback)
+ failCallback(socket, -1, std::string(e.what()));
+ socket.close();
+ }
+}
+
+} // namespace windows
+
+AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
+ Callback callback)
+{
+ return new windows::AsynchAcceptor(s, callback);
+}
+
+AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb)
+{
+ return new windows::AsynchConnector(s,
+ hostname,
+ port,
+ connCb,
+ failCb);
+}
+
+
+/*
+ * Asynch reader/writer
+ */
+
+namespace windows {
+
+class AsynchIO : public qpid::sys::AsynchIO {
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+ ~AsynchIO();
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ /**
+ * Notify the object is should delete itself as soon as possible.
+ */
+ virtual void queueForDeletion();
+
+ /// Take any actions needed to prepare for working with the poller.
+ virtual void start(Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void startReading();
+ virtual void stopReading();
+ virtual void requestCallback(RequestCallback);
+
+ /**
+ * getQueuedBuffer returns a buffer from the buffer queue, if one is
+ * available.
+ *
+ * @retval Pointer to BufferBase buffer; 0 if none is available.
+ */
+ virtual BufferBase* getQueuedBuffer();
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ Poller::shared_ptr poller;
+
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
+ * access to the buffer queue and write queue.
+ */
+ Mutex bufferQueueLock;
+
+ // Number of outstanding I/O operations.
+ volatile LONG opsInProgress;
+ // Is there a write in progress?
+ volatile bool writeInProgress;
+ // Deletion requested, but there are callbacks in progress.
+ volatile bool queuedDelete;
+ // Socket close requested, but there are operations in progress.
+ volatile bool queuedClose;
+
+private:
+ // Dispatch events that have completed.
+ void notifyEof(void);
+ void notifyDisconnect(void);
+ void notifyClosed(void);
+ void notifyBuffersEmpty(void);
+ void notifyIdle(void);
+
+ /**
+ * Initiate a write of the specified buffer. There's no callback for
+ * write completion to the AsynchIO object.
+ */
+ void startWrite(AsynchIO::BufferBase* buff);
+
+ void close(void);
+
+ /**
+ * readComplete is called when a read request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void readComplete(AsynchReadResult *result);
+
+ /**
+ * writeComplete is called when a write request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void writeComplete(AsynchWriteResult *result);
+
+ /**
+ * Queue of completions to run. This queue enforces the requirement
+ * from upper layers that only one thread at a time is allowed to act
+ * on any given connection. Once a thread is busy processing a completion
+ * on this object, other threads that dispatch completions queue the
+ * completions here for the in-progress thread to handle when done.
+ * Thus, any threads can dispatch a completion from the IocpPoller, but
+ * this class ensures that actual processing at the connection level is
+ * only on one thread at a time.
+ */
+ std::queue<AsynchIoResult *> completionQueue;
+ volatile bool working;
+ Mutex completionLock;
+
+ /**
+ * Called when there's a completion to process.
+ */
+ void completion(AsynchIoResult *result);
+};
+
+// This is used to encapsulate pure callbacks into a handle
+class CallbackHandle : public IOHandle {
+public:
+ CallbackHandle(AsynchIoResult::Completer completeCb,
+ AsynchIO::RequestCallback reqCb = 0) :
+ IOHandle(new IOHandlePrivate (INVALID_SOCKET, completeCb, reqCb))
+ {}
+};
+
+AsynchIO::AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb) :
+
+ readCallback(rCb),
+ eofCallback(eofCb),
+ disCallback(disCb),
+ closedCallback(cCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ socket(s),
+ opsInProgress(0),
+ writeInProgress(false),
+ queuedDelete(false),
+ queuedClose(false),
+ working(false) {
+}
+
+struct deleter
+{
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+ std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+ std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+ queuedDelete = true;
+ if (opsInProgress > 0) {
+ QPID_LOG(info, "Delete AsynchIO queued; ops in progress");
+ // AsynchIOHandler calls this then deletes itself; don't do any more
+ // callbacks.
+ readCallback = 0;
+ eofCallback = 0;
+ disCallback = 0;
+ closedCallback = 0;
+ emptyCallback = 0;
+ idleCallback = 0;
+ }
+ else {
+ delete this;
+ }
+}
+
+void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
+ poller = poller0;
+ poller->monitorHandle(ph, Poller::INPUT);
+ if (writeQueue.size() > 0) // Already have data queued for write
+ notifyPendingWrite();
+ startReading();
+}
+
+void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ QLock l(bufferQueueLock);
+ bufferQueue.push_back(buff);
+}
+
+void AsynchIO::unread(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ buff->squish();
+ QLock l(bufferQueueLock);
+ bufferQueue.push_front(buff);
+}
+
+void AsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+ assert(buff);
+ QLock l(bufferQueueLock);
+ writeQueue.push_back(buff);
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+void AsynchIO::notifyPendingWrite() {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ PollerHandle ph(CallbackHandle(boost::bind(&AsynchIO::completion, this, _1)));
+ poller->monitorHandle(ph, Poller::OUTPUT);
+}
+
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+ if (!writeInProgress)
+ notifyPendingWrite();
+}
+
+bool AsynchIO::writeQueueEmpty() {
+ QLock l(bufferQueueLock);
+ return writeQueue.size() == 0;
+}
+
+/*
+ * Initiate a read operation. AsynchIO::readComplete() will be
+ * called when the read is complete and data is available.
+ */
+void AsynchIO::startReading() {
+ if (queuedDelete)
+ return;
+
+ // (Try to) get a buffer; look on the front since there may be an
+ // "unread" one there with data remaining from last time.
+ AsynchIO::BufferBase *buff = 0;
+ {
+ QLock l(bufferQueueLock);
+
+ if (!bufferQueue.empty()) {
+ buff = bufferQueue.front();
+ assert(buff);
+ bufferQueue.pop_front();
+ }
+ }
+ if (buff != 0) {
+ int readCount = buff->byteCount - buff->dataCount;
+ AsynchReadResult *result =
+ new AsynchReadResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ readCount);
+ DWORD bytesReceived = 0, flags = 0;
+ InterlockedIncrement(&opsInProgress);
+ int status = WSARecv(toSocketHandle(socket),
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesReceived,
+ &flags,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error);
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ }
+ else {
+ notifyBuffersEmpty();
+ }
+ return;
+}
+
+// stopReading was added to prevent a race condition with read-credit on Linux.
+// It may or may not be required on windows.
+//
+// AsynchIOHandler::readbuff() calls stopReading() inside the same
+// critical section that protects startReading() in
+// AsynchIOHandler::giveReadCredit().
+//
+void AsynchIO::stopReading() {}
+
+// Queue the specified callback for invocation from an I/O thread.
+void AsynchIO::requestCallback(RequestCallback callback) {
+ // This method is generally called from a processing thread; transfer
+ // work on this to an I/O thread. Much of the upper layer code assumes
+ // that all I/O-related things happen in an I/O thread.
+ if (poller == 0) // Not really going yet...
+ return;
+
+ InterlockedIncrement(&opsInProgress);
+ PollerHandle ph(CallbackHandle(
+ boost::bind(&AsynchIO::completion, this, _1),
+ callback));
+ poller->monitorHandle(ph, Poller::INPUT);
+}
+
+/**
+ * Return a queued buffer if there are enough to spare.
+ */
+AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
+ QLock l(bufferQueueLock);
+ // Always keep at least one buffer (it might have data that was
+ // "unread" in it).
+ if (bufferQueue.size() <= 1)
+ return 0;
+ BufferBase* buff = bufferQueue.back();
+ assert(buff);
+ bufferQueue.pop_back();
+ return buff;
+}
+
+void AsynchIO::notifyEof(void) {
+ if (eofCallback)
+ eofCallback(*this);
+}
+
+void AsynchIO::notifyDisconnect(void) {
+ if (disCallback)
+ disCallback(*this);
+}
+
+void AsynchIO::notifyClosed(void) {
+ if (closedCallback)
+ closedCallback(*this, socket);
+}
+
+void AsynchIO::notifyBuffersEmpty(void) {
+ if (emptyCallback)
+ emptyCallback(*this);
+}
+
+void AsynchIO::notifyIdle(void) {
+ if (idleCallback)
+ idleCallback(*this);
+}
+
+/*
+ * Asynch reader/writer using overlapped I/O
+ */
+
+void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
+ writeInProgress = true;
+ InterlockedIncrement(&opsInProgress);
+ AsynchWriteResult *result =
+ new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
+ buff,
+ buff->dataCount);
+ DWORD bytesSent = 0;
+ int status = WSASend(toSocketHandle(socket),
+ const_cast<LPWSABUF>(result->getWSABUF()), 1,
+ &bytesSent,
+ 0,
+ result->overlapped(),
+ 0);
+ if (status != 0) {
+ int error = WSAGetLastError();
+ if (error != WSA_IO_PENDING) {
+ result->failure(error); // Also decrements in-progress count
+ result = 0; // result is invalid here
+ return;
+ }
+ }
+ // On status 0 or WSA_IO_PENDING, completion will handle the rest.
+ return;
+}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(void) {
+ socket.close();
+ notifyClosed();
+}
+
+void AsynchIO::readComplete(AsynchReadResult *result) {
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ if (status == 0 && bytes > 0) {
+ bool restartRead = true; // May not if receiver doesn't want more
+ if (readCallback)
+ readCallback(*this, result->getBuff());
+ if (restartRead)
+ startReading();
+ }
+ else {
+ // No data read, so put the buffer back. It may be partially filled,
+ // so "unread" it back to the front of the queue.
+ unread(result->getBuff());
+ notifyEof();
+ if (status != 0)
+ {
+ notifyDisconnect();
+ }
+ }
+}
+
+/*
+ * NOTE - this completion is called for completed writes and also when
+ * a write is desired. The difference is in the buff - if a write is desired
+ * the buff is 0.
+ */
+void AsynchIO::writeComplete(AsynchWriteResult *result) {
+ int status = result->getStatus();
+ size_t bytes = result->getTransferred();
+ AsynchIO::BufferBase *buff = result->getBuff();
+ if (buff != 0) {
+ writeInProgress = false;
+ if (status == 0 && bytes > 0) {
+ if (bytes < result->getRequested()) // Still more to go; resubmit
+ startWrite(buff);
+ else
+ queueReadBuffer(buff); // All done; back to the pool
+ }
+ else {
+ // An error... if it's a connection close, ignore it - it will be
+ // noticed and handled on a read completion any moment now.
+ // What to do with real error??? Save the Buffer?
+ }
+ }
+
+ // If there are no writes outstanding, check for more writes to initiate
+ // (either queued or via idle). The opsInProgress count is handled in
+ // completion()
+ if (!writeInProgress) {
+ bool writing = false;
+ {
+ QLock l(bufferQueueLock);
+ if (writeQueue.size() > 0) {
+ buff = writeQueue.front();
+ assert(buff);
+ writeQueue.pop_front();
+ startWrite(buff);
+ writing = true;
+ }
+ }
+ if (!writing && !queuedClose) {
+ notifyIdle();
+ }
+ }
+ return;
+}
+
+void AsynchIO::completion(AsynchIoResult *result) {
+ {
+ ScopedLock<Mutex> l(completionLock);
+ if (working) {
+ completionQueue.push(result);
+ return;
+ }
+
+ // First thread in with something to do; note we're working then keep
+ // handling completions.
+ working = true;
+ while (result != 0) {
+ // New scope to unlock temporarily.
+ {
+ ScopedUnlock<Mutex> ul(completionLock);
+ AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result);
+ if (r != 0)
+ readComplete(r);
+ else {
+ AsynchWriteResult *w =
+ dynamic_cast<AsynchWriteResult*>(result);
+ if (w != 0)
+ writeComplete(w);
+ else {
+ AsynchCallbackRequest *req =
+ dynamic_cast<AsynchCallbackRequest*>(result);
+ req->reqCallback(*this);
+ }
+ }
+ delete result;
+ result = 0;
+ InterlockedDecrement(&opsInProgress);
+ }
+ // Lock is held again.
+ if (completionQueue.empty())
+ continue;
+ result = completionQueue.front();
+ completionQueue.pop();
+ }
+ working = false;
+ }
+ // Lock released; ok to close if ops are done and close requested.
+ // Layer above will call back to queueForDeletion() if it hasn't
+ // already been done. If it already has, go ahead and delete.
+ if (opsInProgress == 0) {
+ if (queuedClose)
+ // close() may cause a delete; don't trust 'this' on return
+ close();
+ else if (queuedDelete)
+ delete this;
+ }
+}
+
+} // namespace windows
+
+AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
new file mode 100755
index 0000000000..b11324918b
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -0,0 +1,204 @@
+#ifndef _windows_asynchIoResult_h
+#define _windows_asynchIoResult_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include <memory.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * AsynchIoResult defines the class that receives the result of an
+ * asynchronous I/O operation, either send/recv or accept/connect.
+ *
+ * Operation factories should set one of these up before beginning the
+ * operation. Poller knows how to dispatch completion to this class.
+ * This class must be subclassed for needed operations; this class provides
+ * an interface only and cannot be instantiated.
+ *
+ * This class is tied to Windows; it inherits from OVERLAPPED so that the
+ * IocpPoller can cast OVERLAPPED pointers back to AsynchIoResult and call
+ * the completion handler.
+ */
+class AsynchResult : private OVERLAPPED {
+public:
+ LPOVERLAPPED overlapped(void) { return this; }
+ static AsynchResult* from_overlapped(LPOVERLAPPED ol) {
+ return static_cast<AsynchResult*>(ol);
+ }
+ virtual void success (size_t bytesTransferred) {
+ bytes = bytesTransferred;
+ status = 0;
+ complete();
+ }
+ virtual void failure (int error) {
+ bytes = 0;
+ status = error;
+ complete();
+ }
+ size_t getTransferred(void) const { return bytes; }
+ int getStatus(void) const { return status; }
+
+protected:
+ AsynchResult() : bytes(0), status(0)
+ { memset(overlapped(), 0, sizeof(OVERLAPPED)); }
+ ~AsynchResult() {}
+ virtual void complete(void) = 0;
+
+ size_t bytes;
+ int status;
+};
+
+class AsynchAcceptor;
+
+class AsynchAcceptResult : public AsynchResult {
+
+ friend class AsynchAcceptor;
+
+public:
+ AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
+ SOCKET listener);
+ virtual void success (size_t bytesTransferred);
+ virtual void failure (int error);
+
+private:
+ virtual void complete(void) {} // No-op for this class.
+
+ std::auto_ptr<qpid::sys::Socket> newSocket;
+ qpid::sys::AsynchAcceptor::Callback callback;
+ AsynchAcceptor *acceptor;
+ SOCKET listener;
+
+ // AcceptEx needs a place to write the local and remote addresses
+ // when accepting the connection. Place those here; get enough for
+ // IPv6 addresses, even if the socket is IPv4.
+ enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16,
+ SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
+ char addressBuffer[SOCKADDRBUFLEN];
+};
+
+class AsynchIoResult : public AsynchResult {
+public:
+ typedef boost::function1<void, AsynchIoResult *> Completer;
+
+ virtual ~AsynchIoResult() {}
+ qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+ size_t getRequested(void) const { return requested; }
+ const WSABUF *getWSABUF(void) const { return &wsabuf; }
+
+protected:
+ void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+
+protected:
+ AsynchIoResult(Completer cb,
+ qpid::sys::AsynchIO::BufferBase *buff, size_t length)
+ : completionCallback(cb), iobuff(buff), requested(length) {}
+
+ virtual void complete(void) = 0;
+ WSABUF wsabuf;
+ Completer completionCallback;
+
+private:
+ qpid::sys::AsynchIO::BufferBase *iobuff;
+ size_t requested; // Number of bytes in original I/O request
+};
+
+class AsynchReadResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ getBuff()->dataCount += bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchReadResult(AsynchIoResult::Completer cb,
+ qpid::sys::AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff->bytes + buff->dataCount;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteResult : public AsynchIoResult {
+
+ // complete() updates buffer then does completion callback.
+ virtual void complete(void) {
+ qpid::sys::AsynchIO::BufferBase *b = getBuff();
+ b->dataStart += bytes;
+ b->dataCount -= bytes;
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteResult(AsynchIoResult::Completer cb,
+ qpid::sys::AsynchIO::BufferBase *buff,
+ size_t length)
+ : AsynchIoResult(cb, buff, length) {
+ wsabuf.buf = buff ? buff->bytes : 0;
+ wsabuf.len = length;
+ }
+};
+
+class AsynchWriteWanted : public AsynchWriteResult {
+
+ // complete() just does completion callback; no buffers used.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchWriteWanted(AsynchIoResult::Completer cb)
+ : AsynchWriteResult(cb, 0, 0) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+};
+
+class AsynchCallbackRequest : public AsynchIoResult {
+ // complete() needs to simply call the completionCallback; no buffers.
+ virtual void complete(void) {
+ completionCallback(this);
+ }
+
+public:
+ AsynchCallbackRequest(AsynchIoResult::Completer cb,
+ qpid::sys::AsynchIO::RequestCallback reqCb)
+ : AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
+ wsabuf.buf = 0;
+ wsabuf.len = 0;
+ }
+
+ qpid::sys::AsynchIO::RequestCallback reqCallback;
+};
+
+}}} // qpid::sys::windows
+
+#endif /*!_windows_asynchIoResult_h*/
diff --git a/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp
new file mode 100644
index 0000000000..88f1637d48
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/FileSysDir.cpp
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/FileSysDir.h"
+#include "qpid/sys/StrError.h"
+#include "qpid/Exception.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <direct.h>
+#include <errno.h>
+
+namespace qpid {
+namespace sys {
+
+bool FileSysDir::exists (void) const
+{
+ const char *cpath = dirPath.c_str ();
+ struct _stat s;
+ if (::_stat(cpath, &s)) {
+ if (errno == ENOENT) {
+ return false;
+ }
+ throw qpid::Exception (strError(errno) +
+ ": Can't check directory: " + dirPath);
+ }
+ if (s.st_mode & _S_IFDIR)
+ return true;
+ throw qpid::Exception(dirPath + " is not a directory");
+}
+
+void FileSysDir::mkdir(void)
+{
+ if (::_mkdir(dirPath.c_str()) == -1)
+ throw Exception ("Can't create directory: " + dirPath);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
new file mode 100755
index 0000000000..250737cb99
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/IOHandle.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+SOCKET toFd(const IOHandlePrivate* h)
+{
+ return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+ impl(h)
+{}
+
+IOHandle::~IOHandle() {
+ delete impl;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
new file mode 100755
index 0000000000..5943db5cc7
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
@@ -0,0 +1,61 @@
+#ifndef _sys_windows_IoHandlePrivate_h
+#define _sys_windows_IoHandlePrivate_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/CommonImportExport.h"
+
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+// Private fd related implementation details
+// There should be either a valid socket handle or a completer callback.
+// Handle is used to associate with poller's iocp; completer is used to
+// inject a completion that will very quickly trigger a callback to the
+// completer from an I/O thread. If the callback mechanism is used, there
+// can be a RequestCallback set - this carries the callback object through
+// from AsynchIO::requestCallback() through to the I/O completion processing.
+class IOHandlePrivate {
+ friend QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s);
+ static IOHandlePrivate* getImpl(const IOHandle& h);
+
+public:
+ IOHandlePrivate(SOCKET f = INVALID_SOCKET,
+ windows::AsynchIoResult::Completer cb = 0,
+ AsynchIO::RequestCallback reqCallback = 0) :
+ fd(f), event(cb), cbRequest(reqCallback)
+ {}
+
+ SOCKET fd;
+ windows::AsynchIoResult::Completer event;
+ AsynchIO::RequestCallback cbRequest;
+};
+
+QPID_COMMON_EXTERN SOCKET toSocketHandle(const Socket& s);
+
+}}
+
+#endif /* _sys_windows_IoHandlePrivate_h */
diff --git a/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
new file mode 100755
index 0000000000..1805dd2cd8
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Dispatcher.h"
+
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include "qpid/sys/windows/check.h"
+
+#include <winsock2.h>
+#include <windows.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+namespace qpid {
+namespace sys {
+
+class PollerHandlePrivate {
+ friend class Poller;
+ friend class PollerHandle;
+
+ SOCKET fd;
+ windows::AsynchIoResult::Completer cb;
+ AsynchIO::RequestCallback cbRequest;
+
+ PollerHandlePrivate(SOCKET f,
+ windows::AsynchIoResult::Completer cb0 = 0,
+ AsynchIO::RequestCallback rcb = 0)
+ : fd(f), cb(cb0), cbRequest(rcb)
+ {
+ }
+
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+ impl(new PollerHandlePrivate(toSocketHandle(static_cast<const Socket&>(h)), h.impl->event, h.impl->cbRequest))
+{}
+
+PollerHandle::~PollerHandle() {
+ delete impl;
+}
+
+/**
+ * Concrete implementation of Poller to use the Windows I/O Completion
+ * port (IOCP) facility.
+ */
+class PollerPrivate {
+ friend class Poller;
+
+ const HANDLE iocp;
+
+ // The number of threads running the event loop.
+ volatile LONG threadsRunning;
+
+ // Shutdown request is handled by setting isShutdown and injecting a
+ // well-formed completion event into the iocp.
+ bool isShutdown;
+
+ PollerPrivate() :
+ iocp(::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)),
+ threadsRunning(0),
+ isShutdown(false) {
+ QPID_WINDOWS_CHECK_NULL(iocp);
+ }
+
+ ~PollerPrivate() {
+ // It's probably okay to ignore any errors here as there can't be
+ // data loss
+ ::CloseHandle(iocp);
+ }
+};
+
+void Poller::shutdown() {
+ // Allow sloppy code to shut us down more than once.
+ if (impl->isShutdown)
+ return;
+ ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
+ PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
+}
+
+bool Poller::hasShutdown()
+{
+ return impl->isShutdown;
+}
+
+bool Poller::interrupt(PollerHandle&) {
+ return false; // There's no concept of a registered handle.
+}
+
+void Poller::run() {
+ do {
+ Poller::Event event = this->wait();
+
+ // Handle shutdown
+ switch (event.type) {
+ case Poller::SHUTDOWN:
+ return;
+ break;
+ case Poller::INVALID: // On any type of success or fail completion
+ break;
+ default:
+ // This should be impossible
+ assert(false);
+ }
+ } while (true);
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
+ HANDLE h = (HANDLE)(handle.impl->fd);
+ if (h != INVALID_HANDLE_VALUE) {
+ HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0);
+ QPID_WINDOWS_CHECK_NULL(iocpHandle);
+ }
+ else {
+ // INPUT is used to request a callback; OUTPUT to request a write
+ assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
+
+ if (dir == Poller::OUTPUT) {
+ windows::AsynchWriteWanted *result =
+ new windows::AsynchWriteWanted(handle.impl->cb);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+ else {
+ windows::AsynchCallbackRequest *result =
+ new windows::AsynchCallbackRequest(handle.impl->cb,
+ handle.impl->cbRequest);
+ PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
+ }
+ }
+}
+
+// All no-ops...
+void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {}
+void Poller::registerHandle(PollerHandle& /*handle*/) {}
+void Poller::unregisterHandle(PollerHandle& /*handle*/) {}
+
+Poller::Event Poller::wait(Duration timeout) {
+ DWORD timeoutMs = 0;
+ DWORD numTransferred = 0;
+ ULONG_PTR completionKey = 0;
+ OVERLAPPED *overlapped = 0;
+ windows::AsynchResult *result = 0;
+
+ // Wait for either an I/O operation to finish (thus signaling the
+ // IOCP handle) or a shutdown request to be made (thus signaling the
+ // shutdown event).
+ if (timeout == TIME_INFINITE)
+ timeoutMs = INFINITE;
+ else
+ timeoutMs = static_cast<DWORD>(timeout / TIME_MSEC);
+
+ InterlockedIncrement(&impl->threadsRunning);
+ bool goodOp = ::GetQueuedCompletionStatus (impl->iocp,
+ &numTransferred,
+ &completionKey,
+ &overlapped,
+ timeoutMs);
+ LONG remainingThreads = InterlockedDecrement(&impl->threadsRunning);
+ if (goodOp) {
+ // Dequeued a successful completion. If it's a posted packet from
+ // shutdown() the overlapped ptr is 0 and key is 1. Else downcast
+ // the OVERLAPPED pointer to an AsynchIoResult and call the
+ // completion handler.
+ if (overlapped == 0 && completionKey == 1) {
+ // If there are other threads still running this wait, re-post
+ // the completion.
+ if (remainingThreads > 0)
+ PostQueuedCompletionStatus(impl->iocp, 0, completionKey, 0);
+ return Event(0, SHUTDOWN);
+ }
+
+ result = windows::AsynchResult::from_overlapped(overlapped);
+ result->success (static_cast<size_t>(numTransferred));
+ }
+ else {
+ if (overlapped != 0) {
+ // Dequeued a completion for a failed operation. Downcast back
+ // to the result object and inform it that the operation failed.
+ DWORD status = ::GetLastError();
+ result = windows::AsynchResult::from_overlapped(overlapped);
+ result->failure (static_cast<int>(status));
+ }
+ }
+ return Event(0, INVALID); // TODO - this may need to be changed.
+
+}
+
+// Concrete constructors
+Poller::Poller() :
+ impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+ delete impl;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/LockFile.cpp b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp
new file mode 100755
index 0000000000..048c2d5b18
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/LockFile.cpp
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/sys/LockFile.h"
+#include "qpid/sys/windows/check.h"
+
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+class LockFilePrivate {
+ friend class LockFile;
+
+ HANDLE fd;
+
+public:
+ LockFilePrivate(HANDLE f) : fd(f) {}
+};
+
+LockFile::LockFile(const std::string& path_, bool create)
+ : path(path_), created(create) {
+
+ HANDLE h = ::CreateFile(path.c_str(),
+ create ? (GENERIC_READ|GENERIC_WRITE) : GENERIC_READ,
+ FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
+ 0, /* Default security */
+ create ? OPEN_ALWAYS : OPEN_EXISTING,
+ FILE_FLAG_DELETE_ON_CLOSE, /* Delete file when closed */
+ NULL);
+ if (h == INVALID_HANDLE_VALUE)
+ throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError()));
+
+ // Lock up to 4Gb
+ if (!::LockFile(h, 0, 0, 0xffffffff, 0))
+ throw qpid::Exception(path + ": " + qpid::sys::strError(GetLastError()));
+ impl.reset(new LockFilePrivate(h));
+}
+
+LockFile::~LockFile() {
+ if (impl) {
+ if (impl->fd != INVALID_HANDLE_VALUE) {
+ ::UnlockFile(impl->fd, 0, 0, 0xffffffff, 0);
+ ::CloseHandle(impl->fd);
+ }
+ }
+}
+
+}} /* namespace qpid::sys */
diff --git a/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
new file mode 100755
index 0000000000..062458ae5f
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/PipeHandle.cpp
@@ -0,0 +1,101 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/sys/PipeHandle.h"
+#include "qpid/sys/windows/check.h"
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+PipeHandle::PipeHandle(bool nonBlocking) {
+
+ SOCKET listener, pair[2];
+ struct sockaddr_in addr;
+ int err;
+ int addrlen = sizeof(addr);
+ pair[0] = pair[1] = INVALID_SOCKET;
+ if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+ addr.sin_port = 0;
+
+ err = bind(listener, (const struct sockaddr*) &addr, sizeof(addr));
+ if (err == SOCKET_ERROR) {
+ err = WSAGetLastError();
+ closesocket(listener);
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ err = getsockname(listener, (struct sockaddr*) &addr, &addrlen);
+ if (err == SOCKET_ERROR) {
+ err = WSAGetLastError();
+ closesocket(listener);
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ try {
+ if (listen(listener, 1) == SOCKET_ERROR)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if ((pair[0] = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if (connect(pair[0], (const struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ if ((pair[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET)
+ throw QPID_WINDOWS_ERROR(WSAGetLastError());
+
+ closesocket(listener);
+ writeFd = pair[0];
+ readFd = pair[1];
+ }
+ catch (...) {
+ closesocket(listener);
+ if (pair[0] != INVALID_SOCKET)
+ closesocket(pair[0]);
+ throw;
+ }
+
+ // Set the socket to non-blocking
+ if (nonBlocking) {
+ unsigned long nonblock = 1;
+ ioctlsocket(readFd, FIONBIO, &nonblock);
+ }
+}
+
+PipeHandle::~PipeHandle() {
+ closesocket(readFd);
+ closesocket(writeFd);
+}
+
+int PipeHandle::read(void* buf, size_t bufSize) {
+ return ::recv(readFd, (char *)buf, bufSize, 0);
+}
+
+int PipeHandle::write(const void* buf, size_t bufSize) {
+ return ::send(writeFd, (const char *)buf, bufSize, 0);
+}
+
+int PipeHandle::getReadHandle() {
+ return readFd;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
new file mode 100644
index 0000000000..6a1d9045b4
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
@@ -0,0 +1,114 @@
+#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP
+#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PollableCondition.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/windows/AsynchIoResult.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+
+#include <boost/bind.hpp>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+// PollableConditionPrivate will reuse the IocpPoller's ability to queue
+// a completion to the IOCP and have it dispatched to the completer callback
+// noted in the IOHandlePrivate when the request is queued. The
+// AsynchCallbackRequest object is not really used - we already have the
+// desired callback for the user of PollableCondition.
+class PollableConditionPrivate : private IOHandle {
+ friend class PollableCondition;
+
+private:
+ PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
+ sys::PollableCondition& parent,
+ const boost::shared_ptr<sys::Poller>& poller);
+ ~PollableConditionPrivate();
+
+ void poke();
+ void dispatch(windows::AsynchIoResult *result);
+
+private:
+ PollableCondition::Callback cb;
+ PollableCondition& parent;
+ boost::shared_ptr<sys::Poller> poller;
+ LONG isSet;
+};
+
+PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb,
+ sys::PollableCondition& parent,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : IOHandle(new sys::IOHandlePrivate(INVALID_SOCKET,
+ boost::bind(&PollableConditionPrivate::dispatch, this, _1))),
+ cb(cb), parent(parent), poller(poller), isSet(0)
+{
+}
+
+PollableConditionPrivate::~PollableConditionPrivate()
+{
+}
+
+void PollableConditionPrivate::poke()
+{
+ // monitorHandle will queue a completion for the IOCP; when it's handled, a
+ // poller thread will call back to dispatch() below.
+ PollerHandle ph(*this);
+ poller->monitorHandle(ph, Poller::INPUT);
+}
+
+void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result)
+{
+ delete result; // Poller::monitorHandle() allocates this
+ cb(parent);
+ if (isSet)
+ poke();
+}
+
+ /* PollableCondition */
+
+PollableCondition::PollableCondition(const Callback& cb,
+ const boost::shared_ptr<sys::Poller>& poller)
+ : impl(new PollableConditionPrivate(cb, *this, poller))
+{
+}
+
+PollableCondition::~PollableCondition()
+{
+ delete impl;
+}
+
+void PollableCondition::set() {
+ // Add one to the set count and poke it to provoke a callback
+ ::InterlockedIncrement(&impl->isSet);
+ impl->poke();
+}
+
+void PollableCondition::clear() {
+ ::InterlockedExchange(&impl->isSet, 0);
+}
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/
diff --git a/qpid/cpp/src/qpid/sys/windows/Shlib.cpp b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp
new file mode 100644
index 0000000000..ba18747eb4
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Shlib.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Shlib.h"
+#include "qpid/Exception.h"
+#include "qpid/sys/windows/check.h"
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+void Shlib::load(const char* name) {
+ HMODULE h = LoadLibrary(name);
+ if (h == NULL) {
+ throw QPID_WINDOWS_ERROR(GetLastError());
+ }
+ handle = static_cast<void*>(h);
+}
+
+void Shlib::unload() {
+ if (handle) {
+ if (FreeLibrary(static_cast<HMODULE>(handle)) == 0) {
+ throw QPID_WINDOWS_ERROR(GetLastError());
+ }
+ handle = 0;
+ }
+}
+
+void* Shlib::getSymbol(const char* name) {
+ // Double cast avoids warning about casting function pointer to object
+ void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name)));
+ if (sym == NULL)
+ throw QPID_WINDOWS_ERROR(GetLastError());
+ return sym;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
new file mode 100755
index 0000000000..baa80f04e0
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -0,0 +1,289 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
+#include "qpid/sys/windows/check.h"
+#include "qpid/sys/Time.h"
+
+#include <cstdlib>
+#include <string.h>
+
+#include <winsock2.h>
+
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+// Need to initialize WinSock. Ideally, this would be a singleton or embedded
+// in some one-time initialization function. I tried boost singleton and could
+// not get it to compile (and others located in google had the same problem).
+// So, this simple static with an interlocked increment will do for known
+// use cases at this time. Since this will only shut down winsock at process
+// termination, there may be some problems with client programs that also
+// expect to load and unload winsock, but we'll see...
+// If someone does get an easy-to-use singleton sometime, converting to it
+// may be preferable.
+
+namespace {
+
+static LONG volatile initialized = 0;
+
+class WinSockSetup {
+ // : public boost::details::pool::singleton_default<WinSockSetup> {
+
+public:
+ WinSockSetup() {
+ LONG timesEntered = InterlockedIncrement(&initialized);
+ if (timesEntered > 1)
+ return;
+ err = 0;
+ WORD wVersionRequested;
+ WSADATA wsaData;
+
+ /* Request WinSock 2.2 */
+ wVersionRequested = MAKEWORD(2, 2);
+ err = WSAStartup(wVersionRequested, &wsaData);
+ }
+
+ ~WinSockSetup() {
+ WSACleanup();
+ }
+
+public:
+ int error(void) const { return err; }
+
+protected:
+ DWORD err;
+};
+
+static WinSockSetup setup;
+
+} /* namespace */
+
+namespace qpid {
+namespace sys {
+
+namespace {
+
+std::string getName(SOCKET fd, bool local)
+{
+ sockaddr_in name; // big enough for any socket address
+ socklen_t namelen = sizeof(name);
+ if (local) {
+ QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+ } else {
+ QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+ }
+
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ return std::string(dispName) + ":" + std::string(servName);
+}
+} // namespace
+
+Socket::Socket() :
+ IOHandle(new IOHandlePrivate),
+ nonblocking(false),
+ nodelay(false)
+{
+ SOCKET& socket = impl->fd;
+ if (socket != INVALID_SOCKET) Socket::close();
+ SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ socket = s;
+}
+
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h),
+ nonblocking(false),
+ nodelay(false)
+{}
+
+void
+Socket::createSocket(const SocketAddress& sa) const
+{
+ SOCKET& socket = impl->fd;
+ if (socket != INVALID_SOCKET) Socket::close();
+
+ SOCKET s = ::socket (getAddrInfo(sa).ai_family,
+ getAddrInfo(sa).ai_socktype,
+ 0);
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ if (nodelay) setTcpNoDelay();
+ } catch (std::exception&) {
+ closesocket(s);
+ socket = INVALID_SOCKET;
+ throw;
+ }
+}
+
+void Socket::setNonblocking() const {
+ u_long nonblock = 1;
+ QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
+}
+
+void Socket::connect(const std::string& host, const std::string& port) const
+{
+ SocketAddress sa(host, port);
+ connect(sa);
+}
+
+void
+Socket::connect(const SocketAddress& addr) const
+{
+ peername = addr.asString(false);
+
+ const SOCKET& socket = impl->fd;
+ const addrinfo *addrs = &(getAddrInfo(addr));
+ int error = 0;
+ WSASetLastError(0);
+ while (addrs != 0) {
+ if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
+ (WSAGetLastError() == WSAEWOULDBLOCK))
+ break;
+ // Error... save this error code and see if there are other address
+ // to try before throwing the exception.
+ error = WSAGetLastError();
+ addrs = addrs->ai_next;
+ }
+ if (error)
+ throw qpid::Exception(QPID_MSG(strError(error) << ": " << peername));
+}
+
+void
+Socket::close() const
+{
+ SOCKET& socket = impl->fd;
+ if (socket == INVALID_SOCKET) return;
+ QPID_WINSOCK_CHECK(closesocket(socket));
+ socket = INVALID_SOCKET;
+}
+
+
+int Socket::write(const void *buf, size_t count) const
+{
+ const SOCKET& socket = impl->fd;
+ int sent = ::send(socket, (const char *)buf, count, 0);
+ if (sent == SOCKET_ERROR)
+ return -1;
+ return sent;
+}
+
+int Socket::read(void *buf, size_t count) const
+{
+ const SOCKET& socket = impl->fd;
+ int received = ::recv(socket, (char *)buf, count, 0);
+ if (received == SOCKET_ERROR)
+ return -1;
+ return received;
+}
+
+int Socket::listen(const std::string&, const std::string& port, int backlog) const
+{
+ const SOCKET& socket = impl->fd;
+ BOOL yes=1;
+ QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
+ struct sockaddr_in name;
+ memset(&name, 0, sizeof(name));
+ name.sin_family = AF_INET;
+ name.sin_port = htons(boost::lexical_cast<uint16_t>(port));
+ name.sin_addr.s_addr = 0;
+ if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+ if (::listen(socket, backlog) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
+
+ socklen_t namelen = sizeof(name);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
+ return ntohs(name.sin_port);
+}
+
+Socket* Socket::accept() const
+{
+ SOCKET afd = ::accept(impl->fd, 0, 0);
+ if (afd != INVALID_SOCKET)
+ return new Socket(new IOHandlePrivate(afd));
+ else if (WSAGetLastError() == EAGAIN)
+ return 0;
+ else throw QPID_WINDOWS_ERROR(WSAGetLastError());
+}
+
+std::string Socket::getPeerAddress() const
+{
+ if (peername.empty())
+ peername = getName(impl->fd, false);
+ return peername;
+}
+
+std::string Socket::getLocalAddress() const
+{
+ if (localname.empty())
+ localname = getName(impl->fd, true);
+ return localname;
+}
+
+int Socket::getError() const
+{
+ int result;
+ socklen_t rSize = sizeof (result);
+
+ QPID_WINSOCK_CHECK(::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, (char *)&result, &rSize));
+ return result;
+}
+
+void Socket::setTcpNoDelay() const
+{
+ int flag = 1;
+ int result = setsockopt(impl->fd,
+ IPPROTO_TCP,
+ TCP_NODELAY,
+ (char *)&flag,
+ sizeof(flag));
+ QPID_WINSOCK_CHECK(result);
+ nodelay = true;
+}
+
+inline IOHandlePrivate* IOHandlePrivate::getImpl(const qpid::sys::IOHandle &h)
+{
+ return h.impl;
+}
+
+SOCKET toSocketHandle(const Socket& s)
+{
+ return IOHandlePrivate::getImpl(s)->fd;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
new file mode 100644
index 0000000000..ac43cd2d23
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/sys/windows/check.h"
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+ host(host0),
+ port(port0),
+ addrInfo(0)
+{
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = host.c_str();
+ }
+ const char* service = port.empty() ? "0" : port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+}
+
+SocketAddress::~SocketAddress()
+{
+ ::freeaddrinfo(addrInfo);
+}
+
+std::string SocketAddress::asString(bool) const
+{
+ return host + ":" + port;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+ return *sa.addrInfo;
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
new file mode 100644
index 0000000000..11a3389e45
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -0,0 +1,661 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SslAsynchIO.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
+
+#include "qpid/sys/windows/check.h"
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+#include <queue>
+#include <boost/bind.hpp>
+
+namespace {
+
+ /*
+ * To make the SSL encryption more efficient, set up a new BufferBase
+ * that leaves room for the SSL header to be prepended and the SSL
+ * trailer to be appended.
+ *
+ * This works by accepting a properly formed BufferBase, remembering it,
+ * and resetting the members of this struct to reflect the reserved
+ * header and trailer areas. It's only needed for giving buffers up to
+ * the frame layer for writing into.
+ */
+ struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase {
+ std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff;
+
+ SslIoBuff (qpid::sys::AsynchIO::BufferBase *base,
+ const SecPkgContext_StreamSizes &sizes)
+ : qpid::sys::AsynchIO::BufferBase(&base->bytes[sizes.cbHeader],
+ std::min(base->byteCount - sizes.cbHeader - sizes.cbTrailer,
+ sizes.cbMaximumMessage)),
+ aioBuff(base)
+ {}
+
+ ~SslIoBuff() {}
+ qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); }
+ };
+}
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb,
+ NegotiateDoneCallback nCb) :
+ credHandle(hCred),
+ aio(0),
+ state(Negotiating),
+ readCallback(rCb),
+ idleCallback(iCb),
+ negotiateDoneCallback(nCb),
+ callbacksInProgress(0),
+ queuedDelete(false),
+ leftoverPlaintext(0)
+{
+ SecInvalidateHandle(&ctxtHandle);
+ peerAddress = s.getPeerAddress();
+ aio = qpid::sys::AsynchIO::create(s,
+ boost::bind(&SslAsynchIO::sslDataIn, this, _1, _2),
+ eofCb,
+ disCb,
+ cCb,
+ eCb,
+ boost::bind(&SslAsynchIO::idle, this, _1));
+}
+
+SslAsynchIO::~SslAsynchIO() {
+ if (leftoverPlaintext) {
+ delete leftoverPlaintext;
+ leftoverPlaintext = 0;
+ }
+}
+
+void SslAsynchIO::queueForDeletion() {
+ // This method effectively disconnects the layer above; pass it on the
+ // AsynchIO and delete.
+ aio->queueForDeletion();
+ queuedDelete = true;
+ if (!callbacksInProgress)
+ delete this;
+}
+
+void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) {
+ aio->start(poller);
+ startNegotiate();
+}
+
+void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
+ aio->queueReadBuffer(buff);
+}
+
+void SslAsynchIO::unread(AsynchIO::BufferBase* buff) {
+ // This is plaintext data being given back for more. Since it's already
+ // decrypted, don't give it back to the aio layer; keep it to append
+ // any new data for the upper layer.
+ assert(buff);
+ buff->squish();
+ assert(leftoverPlaintext == 0);
+ leftoverPlaintext = buff;
+}
+
+void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
+ // @@TODO: Need to delay the write if the session is renegotiating.
+
+ // Should not have gotten here without an SslIoBuff. This assert is
+ // primarily to catch any stray cases where write is called with a buffer
+ // not obtained via getQueuedBuffer.
+ SslIoBuff *sslBuff = dynamic_cast<SslIoBuff*>(buff);
+ assert(sslBuff != 0);
+
+ // Encrypt and hand off to the io layer. Remember that the upper layer's
+ // encoding was working on, and adjusting counts for, the SslIoBuff.
+ // Update the count of the original BufferBase before handing off to
+ // the I/O layer.
+ buff = sslBuff->release();
+ SecBuffer buffs[4];
+ buffs[0].cbBuffer = schSizes.cbHeader;
+ buffs[0].BufferType = SECBUFFER_STREAM_HEADER;
+ buffs[0].pvBuffer = buff->bytes; // This space was left by SslIoBuff
+ buffs[1].cbBuffer = sslBuff->dataCount;
+ buffs[1].BufferType = SECBUFFER_DATA;
+ buffs[1].pvBuffer = sslBuff->bytes;
+ buffs[2].cbBuffer = schSizes.cbTrailer;
+ buffs[2].BufferType = SECBUFFER_STREAM_TRAILER;
+ buffs[2].pvBuffer = &sslBuff->bytes[sslBuff->dataCount];
+ buffs[3].cbBuffer = 0;
+ buffs[3].BufferType = SECBUFFER_EMPTY;
+ buffs[3].pvBuffer = 0;
+ SecBufferDesc buffDesc;
+ buffDesc.ulVersion = SECBUFFER_VERSION;
+ buffDesc.cBuffers = 4;
+ buffDesc.pBuffers = buffs;
+ SECURITY_STATUS status = ::EncryptMessage(&ctxtHandle, 0, &buffDesc, 0);
+
+ // EncryptMessage encrypts the data in place. The header and trailer
+ // areas were left previously and must now be included in the updated
+ // count of bytes to write to the peer.
+ delete sslBuff;
+ buff->dataCount = buffs[0].cbBuffer + buffs[1].cbBuffer + buffs[2].cbBuffer;
+ aio->queueWrite(buff);
+}
+
+void SslAsynchIO::notifyPendingWrite() {
+ aio->notifyPendingWrite();
+}
+
+void SslAsynchIO::queueWriteClose() {
+ if (state == Negotiating) {
+ // Never got going, so don't bother trying to close SSL down orderly.
+ state = ShuttingDown;
+ aio->queueWriteClose();
+ return;
+ }
+
+ state = ShuttingDown;
+
+ DWORD shutdown = SCHANNEL_SHUTDOWN;
+ SecBuffer shutBuff;
+ shutBuff.cbBuffer = sizeof(DWORD);
+ shutBuff.BufferType = SECBUFFER_TOKEN;
+ shutBuff.pvBuffer = &shutdown;
+ SecBufferDesc desc;
+ desc.ulVersion = SECBUFFER_VERSION;
+ desc.cBuffers = 1;
+ desc.pBuffers = &shutBuff;
+ ::ApplyControlToken(&ctxtHandle, &desc);
+ negotiateStep(0);
+ // When the shutdown sequence is done, negotiateDone() will handle
+ // shutting down aio.
+}
+
+bool SslAsynchIO::writeQueueEmpty() {
+ return aio->writeQueueEmpty();
+}
+
+/*
+ * Initiate a read operation. AsynchIO::readComplete() will be
+ * called when the read is complete and data is available.
+ */
+void SslAsynchIO::startReading() {
+ aio->startReading();
+}
+
+void SslAsynchIO::stopReading() {
+ aio->stopReading();
+}
+
+// Queue the specified callback for invocation from an I/O thread.
+void SslAsynchIO::requestCallback(RequestCallback callback) {
+ aio->requestCallback(callback);
+}
+
+/**
+ * Return a queued buffer read to put new data in for writing.
+ * This method ALWAYS returns a SslIoBuff reflecting a BufferBase from
+ * the aio layer that has header and trailer space reserved.
+ */
+AsynchIO::BufferBase* SslAsynchIO::getQueuedBuffer() {
+ SslIoBuff *sslBuff = 0;
+ BufferBase* buff = aio->getQueuedBuffer();
+ if (buff == 0)
+ return 0;
+
+ sslBuff = new SslIoBuff(buff, schSizes);
+ return sslBuff;
+}
+
+unsigned int SslAsynchIO::getSslKeySize() {
+ SecPkgContext_KeyInfo info;
+ memset(&info, 0, sizeof(info));
+ ::QueryContextAttributes(&ctxtHandle, SECPKG_ATTR_KEY_INFO, &info);
+ return info.KeySize;
+}
+
+void SslAsynchIO::negotiationDone() {
+ switch(state) {
+ case Negotiating:
+ ::QueryContextAttributes(&ctxtHandle,
+ SECPKG_ATTR_STREAM_SIZES,
+ &schSizes);
+ state = Running;
+ if (negotiateDoneCallback)
+ negotiateDoneCallback(SEC_E_OK);
+ break;
+ case Redo:
+ state = Running;
+ break;
+ case ShuttingDown:
+ aio->queueWriteClose();
+ break;
+ default:
+ assert(0);
+ }
+}
+
+void SslAsynchIO::negotiationFailed(SECURITY_STATUS status) {
+ QPID_LOG(notice, "SSL negotiation failed to " << peerAddress << ": " <<
+ qpid::sys::strError(status));
+ if (negotiateDoneCallback)
+ negotiateDoneCallback(status);
+ else
+ queueWriteClose();
+}
+
+void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
+ if (state != Running) {
+ negotiateStep(buff);
+ return;
+ }
+
+ // Decrypt the buffer; if there's legit data, pass it on through.
+ // However, it's also possible that the peer hasn't supplied enough
+ // data yet, or the session needs to be renegotiated, or the session
+ // is ending.
+ SecBuffer recvBuffs[4];
+ recvBuffs[0].cbBuffer = buff->dataCount;
+ recvBuffs[0].BufferType = SECBUFFER_DATA;
+ recvBuffs[0].pvBuffer = &buff->bytes[buff->dataStart];
+ recvBuffs[1].BufferType = SECBUFFER_EMPTY;
+ recvBuffs[2].BufferType = SECBUFFER_EMPTY;
+ recvBuffs[3].BufferType = SECBUFFER_EMPTY;
+ SecBufferDesc buffDesc;
+ buffDesc.ulVersion = SECBUFFER_VERSION;
+ buffDesc.cBuffers = 4;
+ buffDesc.pBuffers = recvBuffs;
+ SECURITY_STATUS status = ::DecryptMessage(&ctxtHandle, &buffDesc, 0, NULL);
+ if (status != SEC_E_OK) {
+ if (status == SEC_E_INCOMPLETE_MESSAGE) {
+ // Give the partially filled buffer back and get more data
+ a.unread(buff);
+ }
+ else {
+ // Don't need this any more...
+ a.queueReadBuffer(buff);
+
+ if (status == SEC_I_RENEGOTIATE) {
+ state = Redo;
+ negotiateStep(0);
+ }
+ else if (status == SEC_I_CONTEXT_EXPIRED) {
+ queueWriteClose();
+ }
+ else {
+ throw QPID_WINDOWS_ERROR(status);
+ }
+ }
+ return;
+ }
+
+ // All decrypted and verified... continue with AMQP. The recvBuffs have
+ // been set up by DecryptMessage to demarcate the SSL header, data, and
+ // trailer, as well as any extra data left over. Walk through and find
+ // that info, adjusting the buff data accordingly to reflect only the
+ // actual decrypted data.
+ // If there's extra data, copy that out to a new buffer and run through
+ // this method again.
+ BufferBase *extraBuff = 0;
+ for (int i = 0; i < 4; i++) {
+ switch (recvBuffs[i].BufferType) {
+ case SECBUFFER_STREAM_HEADER:
+ buff->dataStart += recvBuffs[i].cbBuffer;
+ // Fall through - also don't count these bytes as data
+ case SECBUFFER_STREAM_TRAILER:
+ buff->dataCount -= recvBuffs[i].cbBuffer;
+ break;
+ case SECBUFFER_EXTRA:
+ // Very important to get this buffer from the downstream aio.
+ // The ones constructed from the local getQueuedBuffer() are
+ // restricted size for encrypting. However, data coming up from
+ // TCP may have a bunch of SSL segments coalesced and be much
+ // larger than the maximum single SSL segment.
+ extraBuff = a.getQueuedBuffer();
+ if (0 == extraBuff)
+ throw QPID_WINDOWS_ERROR(WSAENOBUFS);
+ memmove(extraBuff->bytes,
+ recvBuffs[i].pvBuffer,
+ recvBuffs[i].cbBuffer);
+ extraBuff->dataCount = recvBuffs[i].cbBuffer;
+ break;
+ default:
+ break;
+ }
+ }
+
+ // Since we've already taken (possibly) all the available bytes from the
+ // aio layer, need to be sure that everything that's processable is
+ // processed before returning back to aio. It could be that any
+ // leftoverPlaintext data plus new buff data won't fit in one buffer, so
+ // need to keep going around the input processing loop until either
+ // all the bytes are gone, or there's less than a full frame remaining
+ // (so we can count on more bytes being on the way via aio).
+ do {
+ BufferBase *temp = 0;
+ // Now that buff reflects only decrypted data, see if there was any
+ // partial data left over from last time. If so, append this new
+ // data to that and release the current buff back to aio. Assume that
+ // leftoverPlaintext was squished so the data starts at 0.
+ if (leftoverPlaintext != 0) {
+ // There is leftover data; append all the new data that will fit.
+ int32_t count = buff->dataCount;
+ if (leftoverPlaintext->dataCount + count > leftoverPlaintext->byteCount)
+ count = (leftoverPlaintext->byteCount - leftoverPlaintext->dataCount);
+ ::memmove(&leftoverPlaintext->bytes[leftoverPlaintext->dataCount],
+ &buff->bytes[buff->dataStart],
+ count);
+ leftoverPlaintext->dataCount += count;
+ buff->dataCount -= count;
+ buff->dataStart += count;
+ if (buff->dataCount == 0) {
+ a.queueReadBuffer(buff);
+ buff = 0;
+ }
+ // Prepare to pass the buffer up. Beware that the read callback
+ // may do an unread(), so move the leftoverPlaintext pointer
+ // out of the way. It also may release the buffer back to aio,
+ // so in either event, the pointer passed to the callback is not
+ // valid on return.
+ temp = leftoverPlaintext;
+ leftoverPlaintext = 0;
+ }
+ else {
+ temp = buff;
+ buff = 0;
+ }
+ if (readCallback) {
+ // The callback guard here is to prevent an upcall from deleting
+ // this out from under us via queueForDeletion().
+ ++callbacksInProgress;
+ readCallback(*this, temp);
+ --callbacksInProgress;
+ }
+ else
+ a.queueReadBuffer(temp); // What else can we do with this???
+ } while (buff != 0);
+
+ // Ok, the current decrypted data is done. If there was any extra data,
+ // go back and handle that.
+ if (extraBuff != 0)
+ sslDataIn(a, extraBuff);
+
+ // If the upper layer queued for delete, do that now that all the
+ // callbacks are done.
+ if (queuedDelete && callbacksInProgress == 0)
+ delete this;
+}
+
+void SslAsynchIO::idle(qpid::sys::AsynchIO&) {
+ // Don't relay idle indication to layer above until SSL session is up.
+ if (state == Running) {
+ state = Running;
+ if (idleCallback)
+ idleCallback(*this);
+ }
+}
+
+ /**************************************************/
+
+ClientSslAsynchIO::ClientSslAsynchIO(const std::string& brokerHost,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb,
+ NegotiateDoneCallback nCb) :
+ SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb),
+ serverHost(brokerHost)
+{
+}
+
+void ClientSslAsynchIO::startNegotiate() {
+ // SEC_CHAR is non-const, so do all the typing here.
+ SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str());
+
+ // Need a buffer to receive the token to send to the server.
+ BufferBase *buff = aio->getQueuedBuffer();
+ ULONG ctxtRequested = ISC_REQ_STREAM;
+ ULONG ctxtAttrs;
+ // sendBuffs gets information to forward to the peer.
+ SecBuffer sendBuffs[2];
+ sendBuffs[0].cbBuffer = buff->byteCount;
+ sendBuffs[0].BufferType = SECBUFFER_TOKEN;
+ sendBuffs[0].pvBuffer = buff->bytes;
+ sendBuffs[1].cbBuffer = 0;
+ sendBuffs[1].BufferType = SECBUFFER_EMPTY;
+ sendBuffs[1].pvBuffer = 0;
+ SecBufferDesc sendBuffDesc;
+ sendBuffDesc.ulVersion = SECBUFFER_VERSION;
+ sendBuffDesc.cBuffers = 2;
+ sendBuffDesc.pBuffers = sendBuffs;
+ SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle,
+ NULL,
+ host,
+ ctxtRequested,
+ 0,
+ 0,
+ NULL,
+ 0,
+ &ctxtHandle,
+ &sendBuffDesc,
+ &ctxtAttrs,
+ NULL);
+ if (status == SEC_I_CONTINUE_NEEDED) {
+ buff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(buff);
+ }
+}
+
+void ClientSslAsynchIO::negotiateStep(BufferBase* buff) {
+ // SEC_CHAR is non-const, so do all the typing here.
+ SEC_CHAR *host = const_cast<SEC_CHAR *>(serverHost.c_str());
+ ULONG ctxtRequested = ISC_REQ_STREAM;
+ ULONG ctxtAttrs;
+
+ // tokenBuffs describe the buffer that's coming in. It should have
+ // a token from the SSL server.
+ SecBuffer tokenBuffs[2];
+ tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0;
+ tokenBuffs[0].BufferType = SECBUFFER_TOKEN;
+ tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0;
+ tokenBuffs[1].cbBuffer = 0;
+ tokenBuffs[1].BufferType = SECBUFFER_EMPTY;
+ tokenBuffs[1].pvBuffer = 0;
+ SecBufferDesc tokenBuffDesc;
+ tokenBuffDesc.ulVersion = SECBUFFER_VERSION;
+ tokenBuffDesc.cBuffers = 2;
+ tokenBuffDesc.pBuffers = tokenBuffs;
+
+ // Need a buffer to receive any token to send back to the server.
+ BufferBase *sendbuff = aio->getQueuedBuffer();
+ // sendBuffs gets information to forward to the peer.
+ SecBuffer sendBuffs[2];
+ sendBuffs[0].cbBuffer = sendbuff->byteCount;
+ sendBuffs[0].BufferType = SECBUFFER_TOKEN;
+ sendBuffs[0].pvBuffer = sendbuff->bytes;
+ sendBuffs[1].cbBuffer = 0;
+ sendBuffs[1].BufferType = SECBUFFER_EMPTY;
+ sendBuffs[1].pvBuffer = 0;
+ SecBufferDesc sendBuffDesc;
+ sendBuffDesc.ulVersion = SECBUFFER_VERSION;
+ sendBuffDesc.cBuffers = 2;
+ sendBuffDesc.pBuffers = sendBuffs;
+
+ SECURITY_STATUS status = ::InitializeSecurityContext(&credHandle,
+ &ctxtHandle,
+ host,
+ ctxtRequested,
+ 0,
+ 0,
+ &tokenBuffDesc,
+ 0,
+ NULL,
+ &sendBuffDesc,
+ &ctxtAttrs,
+ NULL);
+
+ if (status == SEC_E_INCOMPLETE_MESSAGE) {
+ // Not enough - get more data from the server then try again.
+ aio->unread(buff);
+ aio->queueReadBuffer(sendbuff); // Don't need this one for now...
+ return;
+ }
+ // Done with the buffer that came in...
+ if (buff)
+ aio->queueReadBuffer(buff);
+ if (status == SEC_I_CONTINUE_NEEDED) {
+ sendbuff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(sendbuff);
+ return;
+ }
+ // Nothing to send back to the server...
+ aio->queueReadBuffer(sendbuff);
+ // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be
+ // either session stop or negotiation done (session up).
+ if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED)
+ negotiationDone();
+ else
+ negotiationFailed(status);
+}
+
+/*************************************************/
+
+ServerSslAsynchIO::ServerSslAsynchIO(bool clientMustAuthenticate,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb,
+ BuffersEmptyCallback eCb,
+ IdleCallback iCb,
+ NegotiateDoneCallback nCb) :
+ SslAsynchIO(s, hCred, rCb, eofCb, disCb, cCb, eCb, iCb, nCb),
+ clientAuth(clientMustAuthenticate)
+{
+}
+
+void ServerSslAsynchIO::startNegotiate() {
+ // Nothing... need the client to send a token first.
+}
+
+void ServerSslAsynchIO::negotiateStep(BufferBase* buff) {
+ ULONG ctxtRequested = ASC_REQ_STREAM;
+ if (clientAuth)
+ ctxtRequested |= ASC_REQ_MUTUAL_AUTH;
+ ULONG ctxtAttrs;
+
+ // tokenBuffs describe the buffer that's coming in. It should have
+ // a token from the SSL server except if shutting down or renegotiating.
+ SecBuffer tokenBuffs[2];
+ tokenBuffs[0].cbBuffer = buff ? buff->dataCount : 0;
+ tokenBuffs[0].BufferType = SECBUFFER_TOKEN;
+ tokenBuffs[0].pvBuffer = buff ? buff->bytes : 0;
+ tokenBuffs[1].cbBuffer = 0;
+ tokenBuffs[1].BufferType = SECBUFFER_EMPTY;
+ tokenBuffs[1].pvBuffer = 0;
+ SecBufferDesc tokenBuffDesc;
+ tokenBuffDesc.ulVersion = SECBUFFER_VERSION;
+ tokenBuffDesc.cBuffers = 2;
+ tokenBuffDesc.pBuffers = tokenBuffs;
+
+ // Need a buffer to receive any token to send back to the server.
+ BufferBase *sendbuff = aio->getQueuedBuffer();
+ // sendBuffs gets information to forward to the peer.
+ SecBuffer sendBuffs[2];
+ sendBuffs[0].cbBuffer = sendbuff->byteCount;
+ sendBuffs[0].BufferType = SECBUFFER_TOKEN;
+ sendBuffs[0].pvBuffer = sendbuff->bytes;
+ sendBuffs[1].cbBuffer = 0;
+ sendBuffs[1].BufferType = SECBUFFER_EMPTY;
+ sendBuffs[1].pvBuffer = 0;
+ SecBufferDesc sendBuffDesc;
+ sendBuffDesc.ulVersion = SECBUFFER_VERSION;
+ sendBuffDesc.cBuffers = 2;
+ sendBuffDesc.pBuffers = sendBuffs;
+ PCtxtHandle ctxtHandlePtr = (SecIsValidHandle(&ctxtHandle)) ? &ctxtHandle : 0;
+ SECURITY_STATUS status = ::AcceptSecurityContext(&credHandle,
+ ctxtHandlePtr,
+ &tokenBuffDesc,
+ ctxtRequested,
+ 0,
+ &ctxtHandle,
+ &sendBuffDesc,
+ &ctxtAttrs,
+ NULL);
+ if (status == SEC_E_INCOMPLETE_MESSAGE) {
+ // Not enough - get more data from the server then try again.
+ if (buff)
+ aio->unread(buff);
+ aio->queueReadBuffer(sendbuff); // Don't need this one for now...
+ return;
+ }
+ // Done with the buffer that came in...
+ if (buff)
+ aio->queueReadBuffer(buff);
+ if (status == SEC_I_CONTINUE_NEEDED) {
+ sendbuff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(sendbuff);
+ return;
+ }
+ // There may have been a token generated; if so, send it to the client.
+ if (sendBuffs[0].cbBuffer > 0) {
+ sendbuff->dataCount = sendBuffs[0].cbBuffer;
+ aio->queueWrite(sendbuff);
+ }
+ else
+ // Nothing to send back to the server...
+ aio->queueReadBuffer(sendbuff);
+
+ // SEC_I_CONTEXT_EXPIRED means session stop complete; SEC_E_OK can be
+ // either session stop or negotiation done (session up).
+ if (status == SEC_E_OK || status == SEC_I_CONTEXT_EXPIRED) {
+ if (clientAuth)
+ QPID_LOG(warning, "DID WE CHECK FOR CLIENT AUTH???");
+
+ negotiationDone();
+ }
+ else {
+ negotiationFailed(status);
+ }
+}
+
+}}} // namespace qpid::sys::windows
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h
new file mode 100644
index 0000000000..3cdf2c8f08
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h
@@ -0,0 +1,191 @@
+#ifndef _sys_windows_SslAsynchIO
+#define _sys_windows_SslAsynchIO
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/CommonImportExport.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <windows.h>
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+class Socket;
+class Poller;
+
+/*
+ * SSL/Schannel shim between the frame-handling and AsynchIO layers.
+ * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class
+ * gets involved for SSL negotiations and encrypt/decrypt. The details of
+ * how this all works are invisible to the layers on either side. The only
+ * change from normal AsynchIO usage is that there's an extra callback
+ * from SslAsynchIO to indicate that the initial session negotiation is
+ * complete.
+ *
+ * The details of session negotiation are different for client and server
+ * SSL roles. These differences are handled by deriving separate client
+ * and server role classes.
+ */
+class SslAsynchIO : public qpid::sys::AsynchIO {
+public:
+ typedef boost::function1<void, SECURITY_STATUS> NegotiateDoneCallback;
+
+ SslAsynchIO(const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0,
+ NegotiateDoneCallback nCb = 0);
+ ~SslAsynchIO();
+
+ virtual void queueForDeletion();
+
+ virtual void start(qpid::sys::Poller::shared_ptr poller);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void startReading();
+ virtual void stopReading();
+ virtual void requestCallback(RequestCallback);
+ virtual BufferBase* getQueuedBuffer();
+
+ QPID_COMMON_EXTERN unsigned int getSslKeySize();
+
+protected:
+ CredHandle credHandle;
+
+ // AsynchIO layer below that's actually doing the I/O
+ qpid::sys::AsynchIO *aio;
+
+ // Track what the state of the SSL session is. Have to know when it's
+ // time to notify the upper layer that the session is up, and also to
+ // know when it's not legit to pass data through to either side.
+ enum { Negotiating, Running, Redo, ShuttingDown } state;
+ bool sessionUp;
+ CtxtHandle ctxtHandle;
+ TimeStamp credExpiry;
+
+ // Client- and server-side SSL subclasses implement these to do the
+ // proper negotiation steps. negotiateStep() is called with a buffer
+ // just received from the peer.
+ virtual void startNegotiate() = 0;
+ virtual void negotiateStep(BufferBase *buff) = 0;
+
+ // The negotiating steps call one of these when it's finalized:
+ void negotiationDone();
+ void negotiationFailed(SECURITY_STATUS status);
+
+private:
+ // These are callbacks from AsynchIO to here.
+ void sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff);
+ void idle(qpid::sys::AsynchIO&);
+
+ // These callbacks are to the layer above.
+ ReadCallback readCallback;
+ IdleCallback idleCallback;
+ NegotiateDoneCallback negotiateDoneCallback;
+ volatile unsigned int callbacksInProgress; // >0 if w/in callbacks
+ volatile bool queuedDelete;
+
+ // Address of peer, in case it's needed for logging.
+ std::string peerAddress;
+
+ // Partial buffer of decrypted plaintext given back by the layer above.
+ AsynchIO::BufferBase *leftoverPlaintext;
+
+ SecPkgContext_StreamSizes schSizes;
+};
+
+/*
+ * SSL/Schannel client-side shim between the frame-handling and AsynchIO
+ * layers.
+ */
+class ClientSslAsynchIO : public SslAsynchIO {
+public:
+ // Args same as for SslIoShim, with the addition of brokerHost which is
+ // the expected SSL name of the server.
+ QPID_COMMON_EXTERN ClientSslAsynchIO(const std::string& brokerHost,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0,
+ NegotiateDoneCallback nCb = 0);
+
+private:
+ std::string serverHost;
+
+ // Client- and server-side SSL subclasses implement these to do the
+ // proper negotiation steps. negotiateStep() is called with a buffer
+ // just received from the peer.
+ void startNegotiate();
+ void negotiateStep(BufferBase *buff);
+};
+/*
+ * SSL/Schannel server-side shim between the frame-handling and AsynchIO
+ * layers.
+ */
+class ServerSslAsynchIO : public SslAsynchIO {
+public:
+ QPID_COMMON_EXTERN ServerSslAsynchIO(bool clientMustAuthenticate,
+ const qpid::sys::Socket& s,
+ CredHandle hCred,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0,
+ NegotiateDoneCallback nCb = 0);
+
+private:
+ bool clientAuth;
+
+ // Client- and server-side SSL subclasses implement these to do the
+ // proper negotiation steps. negotiateStep() is called with a buffer
+ // just received from the peer.
+ void startNegotiate();
+ void negotiateStep(BufferBase *buff);
+};
+
+}}} // namespace qpid::sys::windows
+
+#endif // _sys_windows_SslAsynchIO
diff --git a/qpid/cpp/src/qpid/sys/windows/StrError.cpp b/qpid/cpp/src/qpid/sys/windows/StrError.cpp
new file mode 100755
index 0000000000..546d399d16
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/StrError.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/StrError.h"
+#include <string>
+#include <string.h>
+#include <windows.h>
+
+namespace qpid {
+namespace sys {
+
+std::string strError(int err) {
+ const size_t bufsize = 512;
+ char buf[bufsize];
+ buf[0] = 0;
+ if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK
+ | FORMAT_MESSAGE_FROM_SYSTEM,
+ 0,
+ err,
+ 0, // Default language
+ buf,
+ bufsize,
+ 0))
+ {
+#ifdef _MSC_VER
+ strerror_s(buf, bufsize, err);
+#else
+ return std::string(strerror(err));
+#endif
+ }
+ return std::string(buf);
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
new file mode 100755
index 0000000000..4da440bdd4
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SystemInfo.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* GetNativeSystemInfo call requires _WIN32_WINNT 0x0501 or higher */
+#ifndef _WIN32_WINNT
+# define _WIN32_WINNT 0x0501
+#endif
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+#include <windows.h>
+#include <tlhelp32.h>
+
+#ifndef HOST_NAME_MAX
+# define HOST_NAME_MAX 256
+#endif
+
+namespace qpid {
+namespace sys {
+
+long SystemInfo::concurrency() {
+ SYSTEM_INFO sys_info;
+ ::GetSystemInfo (&sys_info);
+ long activeProcessors = 0;
+ DWORD_PTR mask = sys_info.dwActiveProcessorMask;
+ while (mask != 0) {
+ if (mask & 1)
+ ++activeProcessors;
+ mask >>= 1;
+ }
+ return activeProcessors;
+}
+
+bool SystemInfo::getLocalHostname (Address &address) {
+ char name[HOST_NAME_MAX];
+ if (::gethostname(name, sizeof(name)) != 0) {
+ errno = WSAGetLastError();
+ return false;
+ }
+ address.host = name;
+ return true;
+}
+
+static const std::string LOCALHOST("127.0.0.1");
+static const std::string TCP("tcp");
+
+void SystemInfo::getLocalIpAddresses (uint16_t port,
+ std::vector<Address> &addrList) {
+ enum { MAX_URL_INTERFACES = 100 };
+
+ SOCKET s = socket (PF_INET, SOCK_STREAM, 0);
+ if (s != INVALID_SOCKET) {
+ INTERFACE_INFO interfaces[MAX_URL_INTERFACES];
+ DWORD filledBytes = 0;
+ WSAIoctl (s,
+ SIO_GET_INTERFACE_LIST,
+ 0,
+ 0,
+ interfaces,
+ sizeof (interfaces),
+ &filledBytes,
+ 0,
+ 0);
+ unsigned int interfaceCount = filledBytes / sizeof (INTERFACE_INFO);
+ for (unsigned int i = 0; i < interfaceCount; ++i) {
+ if (interfaces[i].iiFlags & IFF_UP) {
+ std::string addr(inet_ntoa(interfaces[i].iiAddress.AddressIn.sin_addr));
+ if (addr != LOCALHOST)
+ addrList.push_back(Address(TCP, addr, port));
+ }
+ }
+ closesocket (s);
+ }
+}
+
+void SystemInfo::getSystemId (std::string &osName,
+ std::string &nodeName,
+ std::string &release,
+ std::string &version,
+ std::string &machine)
+{
+ osName = "Microsoft Windows";
+
+ char node[MAX_COMPUTERNAME_LENGTH + 1];
+ DWORD nodelen = MAX_COMPUTERNAME_LENGTH + 1;
+ GetComputerName (node, &nodelen);
+ nodeName = node;
+
+ OSVERSIONINFOEX vinfo;
+ vinfo.dwOSVersionInfoSize = sizeof(vinfo);
+ GetVersionEx ((OSVERSIONINFO *)&vinfo);
+
+ SYSTEM_INFO sinfo;
+ GetNativeSystemInfo(&sinfo);
+
+ switch(vinfo.dwMajorVersion) {
+ case 5:
+ switch(vinfo.dwMinorVersion) {
+ case 0:
+ release ="2000";
+ break;
+ case 1:
+ release = "XP";
+ break;
+ case 2:
+ if (sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_AMD64 ||
+ sinfo.wProcessorArchitecture == PROCESSOR_ARCHITECTURE_IA64)
+ release = "XP-64";
+ else
+ release = "Server 2003";
+ break;
+ default:
+ release = "Windows";
+ }
+ break;
+ case 6:
+ if (vinfo.wProductType == VER_NT_SERVER)
+ release = "Server 2008";
+ else
+ release = "Vista";
+ break;
+ default:
+ release = "Microsoft Windows";
+ }
+ version = vinfo.szCSDVersion;
+
+ switch(sinfo.wProcessorArchitecture) {
+ case PROCESSOR_ARCHITECTURE_AMD64:
+ machine = "x86-64";
+ break;
+ case PROCESSOR_ARCHITECTURE_IA64:
+ machine = "IA64";
+ break;
+ case PROCESSOR_ARCHITECTURE_INTEL:
+ machine = "x86";
+ break;
+ default:
+ machine = "unknown";
+ break;
+ }
+}
+
+uint32_t SystemInfo::getProcessId()
+{
+ return static_cast<uint32_t>(::GetCurrentProcessId());
+}
+
+uint32_t SystemInfo::getParentProcessId()
+{
+ // Only want info for the current process, so ask for something specific.
+ // The module info won't be used here but it keeps the snapshot limited to
+ // the current process so a search through all processes is not needed.
+ HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0);
+ if (snap == INVALID_HANDLE_VALUE)
+ return 0;
+ PROCESSENTRY32 entry;
+ entry.dwSize = sizeof(entry);
+ if (!Process32First(snap, &entry))
+ entry.th32ParentProcessID = 0;
+ CloseHandle(snap);
+ return static_cast<uint32_t>(entry.th32ParentProcessID);
+}
+
+std::string SystemInfo::getProcessName()
+{
+ std::string name;
+
+ // Only want info for the current process, so ask for something specific.
+ // The module info won't be used here but it keeps the snapshot limited to
+ // the current process so a search through all processes is not needed.
+ HANDLE snap = CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, 0);
+ if (snap == INVALID_HANDLE_VALUE)
+ return name;
+ PROCESSENTRY32 entry;
+ entry.dwSize = sizeof(entry);
+ if (!Process32First(snap, &entry))
+ entry.szExeFile[0] = '\0';
+ CloseHandle(snap);
+ name = entry.szExeFile;
+ return name;
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/windows/Thread.cpp b/qpid/cpp/src/qpid/sys/windows/Thread.cpp
new file mode 100755
index 0000000000..583a9613a3
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Thread.cpp
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/windows/check.h"
+
+#include <process.h>
+#include <windows.h>
+
+namespace {
+unsigned __stdcall runRunnable(void* p)
+{
+ static_cast<qpid::sys::Runnable*>(p)->run();
+ _endthreadex(0);
+ return 0;
+}
+}
+
+namespace qpid {
+namespace sys {
+
+class ThreadPrivate {
+ friend class Thread;
+
+ HANDLE threadHandle;
+ unsigned threadId;
+
+ ThreadPrivate(Runnable* runnable) {
+ uintptr_t h = _beginthreadex(0,
+ 0,
+ runRunnable,
+ runnable,
+ 0,
+ &threadId);
+ QPID_WINDOWS_CHECK_CRT_NZ(h);
+ threadHandle = reinterpret_cast<HANDLE>(h);
+ }
+
+ ThreadPrivate()
+ : threadHandle(GetCurrentThread()), threadId(GetCurrentThreadId()) {}
+};
+
+Thread::Thread() {}
+
+Thread::Thread(Runnable* runnable) : impl(new ThreadPrivate(runnable)) {}
+
+Thread::Thread(Runnable& runnable) : impl(new ThreadPrivate(&runnable)) {}
+
+Thread::operator bool() {
+ return impl;
+}
+
+bool Thread::operator==(const Thread& t) const {
+ return impl->threadId == t.impl->threadId;
+}
+
+bool Thread::operator!=(const Thread& t) const {
+ return !(*this==t);
+}
+
+void Thread::join() {
+ if (impl) {
+ DWORD status = WaitForSingleObject (impl->threadHandle, INFINITE);
+ QPID_WINDOWS_CHECK_NOT(status, WAIT_FAILED);
+ CloseHandle (impl->threadHandle);
+ impl->threadHandle = 0;
+ }
+}
+
+unsigned long Thread::logId() {
+ return GetCurrentThreadId();
+}
+
+/* static */
+Thread Thread::current() {
+ Thread t;
+ t.impl.reset(new ThreadPrivate());
+ return t;
+}
+
+}} /* qpid::sys */
diff --git a/qpid/cpp/src/qpid/sys/windows/Time.cpp b/qpid/cpp/src/qpid/sys/windows/Time.cpp
new file mode 100644
index 0000000000..25c50819cd
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/Time.cpp
@@ -0,0 +1,136 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include <ostream>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <windows.h>
+
+using namespace boost::posix_time;
+
+namespace {
+
+// High-res timing support. This will display times since program start,
+// more or less. Keep track of the start value and the conversion factor to
+// seconds.
+bool timeInitialized = false;
+LARGE_INTEGER start;
+double freq = 1.0;
+
+}
+
+namespace qpid {
+namespace sys {
+
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) {
+ if (d == Duration::max()) {
+ timepoint = ptime(max_date_time);
+ }
+ else {
+ time_duration td = microseconds(d.nanosecs / 1000);
+ timepoint = t.timepoint + td;
+ }
+}
+
+AbsTime AbsTime::FarFuture() {
+ AbsTime ff;
+ ptime maxd(max_date_time);
+ ff.timepoint = maxd;
+ return ff;
+}
+
+AbsTime AbsTime::Epoch() {
+ AbsTime time_epoch;
+ time_epoch.timepoint = boost::posix_time::from_time_t(0);
+ return time_epoch;
+}
+
+AbsTime AbsTime::now() {
+ AbsTime time_now;
+ time_now.timepoint = boost::get_system_time();
+ return time_now;
+}
+
+Duration::Duration(const AbsTime& start, const AbsTime& finish) {
+ time_duration d = finish.timepoint - start.timepoint;
+ nanosecs = d.total_nanoseconds();
+}
+
+std::ostream& operator<<(std::ostream& o, const Duration& d) {
+ return o << int64_t(d) << "ns";
+}
+
+std::ostream& operator<<(std::ostream& o, const AbsTime& t) {
+ std::string time_string = to_simple_string(t.timepoint);
+ return o << time_string;
+}
+
+
+void sleep(int secs) {
+ ::Sleep(secs * 1000);
+}
+
+void usleep(uint64_t usecs) {
+ DWORD msecs = usecs / 1000;
+ if (msecs == 0)
+ msecs = 1;
+ ::Sleep(msecs);
+}
+
+void outputFormattedNow(std::ostream& o) {
+ ::time_t rawtime;
+ ::tm timeinfo;
+ char time_string[100];
+
+ ::time( &rawtime );
+#ifdef _MSC_VER
+ ::localtime_s(&timeinfo, &rawtime);
+#else
+ timeinfo = *(::localtime(&rawtime));
+#endif
+ ::strftime(time_string, 100,
+ "%Y-%m-%d %H:%M:%S",
+ &timeinfo);
+ o << time_string << " ";
+}
+
+void outputHiresNow(std::ostream& o) {
+ if (!timeInitialized) {
+ start.QuadPart = 0;
+ LARGE_INTEGER iFreq;
+ iFreq.QuadPart = 1;
+ QueryPerformanceCounter(&start);
+ QueryPerformanceFrequency(&iFreq);
+ freq = static_cast<double>(iFreq.QuadPart);
+ timeInitialized = true;
+ }
+ LARGE_INTEGER iNow;
+ iNow.QuadPart = 0;
+ QueryPerformanceCounter(&iNow);
+ iNow.QuadPart -= start.QuadPart;
+ if (iNow.QuadPart < 0)
+ iNow.QuadPart = 0;
+ double now = static_cast<double>(iNow.QuadPart);
+ now /= freq; // now is seconds after this
+ o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+}
+}}
diff --git a/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h
new file mode 100644
index 0000000000..51f613cc25
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/mingw32_compat.h
@@ -0,0 +1,39 @@
+#ifndef _sys_windows_mingw32_compat
+#define _sys_windows_mingw32_compat
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef WIN32
+#ifndef _MSC_VER
+
+//
+// The following definitions for extension function GUIDs and signatures are taken from
+// MswSock.h in the Windows32 SDK. These rightfully belong in the mingw32 version of
+// mswsock.h, but are not included presently.
+//
+
+#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
+typedef BOOL (PASCAL *LPFN_ACCEPTEX)(SOCKET,SOCKET,PVOID,DWORD,DWORD,DWORD,LPDWORD,LPOVERLAPPED);
+
+#endif
+#endif
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.cpp b/qpid/cpp/src/qpid/sys/windows/uuid.cpp
new file mode 100644
index 0000000000..3316ecbc00
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/uuid.cpp
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <rpc.h>
+#ifdef uuid_t /* Done in rpcdce.h */
+# undef uuid_t
+#endif
+
+#include "qpid/sys/windows/uuid.h"
+
+#include <string.h>
+
+void uuid_clear (uuid_t uu) {
+ UuidCreateNil (reinterpret_cast<UUID*>(uu));
+}
+
+void uuid_copy (uuid_t dst, const uuid_t src) {
+ memcpy (dst, src, qpid::sys::UuidSize);
+}
+
+void uuid_generate (uuid_t out) {
+ UuidCreate (reinterpret_cast<UUID*>(out));
+}
+
+int uuid_is_null (const uuid_t uu) {
+ RPC_STATUS unused;
+ return UuidIsNil ((UUID*)uu, &unused);
+}
+
+int uuid_parse (const char *in, uuid_t uu) {
+ return UuidFromString ((unsigned char*)in, (UUID*)uu) == RPC_S_OK ? 0 : -1;
+}
+
+void uuid_unparse (const uuid_t uu, char *out) {
+ unsigned char *formatted;
+ if (UuidToString((UUID*)uu, &formatted) == RPC_S_OK) {
+#ifdef _MSC_VER
+ strncpy_s (out, 36+1, (char*)formatted, _TRUNCATE);
+#else
+ strncpy (out, (char*)formatted, 36+1);
+#endif
+ RpcStringFree(&formatted);
+ }
+}
+
+int uuid_compare (const uuid_t a, const uuid_t b) {
+ RPC_STATUS unused;
+ return !UuidEqual((UUID*)a, (UUID*)b, &unused);
+}
diff --git a/qpid/cpp/src/qpid/sys/windows/uuid.h b/qpid/cpp/src/qpid/sys/windows/uuid.h
new file mode 100644
index 0000000000..8ab132e9ce
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/uuid.h
@@ -0,0 +1,39 @@
+#ifndef _sys_windows_uuid_h
+#define _sys_windows_uuid_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/ImportExport.h"
+#include <qpid/sys/IntegerTypes.h>
+
+namespace qpid { namespace sys { const size_t UuidSize = 16; }}
+typedef uint8_t uuid_t[qpid::sys::UuidSize];
+
+QPID_TYPES_EXTERN void uuid_clear (uuid_t uu);
+QPID_TYPES_EXTERN void uuid_copy (uuid_t dst, const uuid_t src);
+QPID_TYPES_EXTERN void uuid_generate (uuid_t out);
+QPID_TYPES_EXTERN int uuid_is_null (const uuid_t uu); // Returns 1 if null, else 0
+QPID_TYPES_EXTERN int uuid_parse (const char *in, uuid_t uu); // Returns 0 on success, else -1
+QPID_TYPES_EXTERN void uuid_unparse (const uuid_t uu, char *out);
+QPID_TYPES_EXTERN int uuid_compare (const uuid_t a, const uuid_t b);
+
+#endif /*!_sys_windows_uuid_h*/