diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
commit | 868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch) | |
tree | 63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/sys | |
parent | 2e5ff8f1b328831043e6d7e323249d62187234c6 (diff) | |
download | qpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz |
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIO.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/SslPlugin.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/epoll/EpollPoller.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 25 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/posix/SystemInfo.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslHandler.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ssl/SslIo.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/AsynchIO.cpp | 25 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SslAsynchIO.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/SslAsynchIO.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/windows/Time.cpp | 74 |
16 files changed, 182 insertions, 111 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h index 41f74f7ed0..b2eaaac9de 100644 --- a/cpp/src/qpid/sys/AsynchIO.h +++ b/cpp/src/qpid/sys/AsynchIO.h @@ -76,8 +76,8 @@ protected: }; struct AsynchIOBufferBase { - char* const bytes; - const int32_t byteCount; + char* bytes; + int32_t byteCount; int32_t dataStart; int32_t dataCount; @@ -134,9 +134,21 @@ public: BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); public: + /* + * Size of IO buffers - this is the maximum possible frame size + 1 + */ + const static uint32_t MaxBufferSize = 65536; + + /* + * Number of IO buffers allocated - I think the code can only use 2 - + * 1 for reading and 1 for writing, allocate 4 for safety + */ + const static uint32_t BufferCount = 4; + virtual void queueForDeletion() = 0; virtual void start(boost::shared_ptr<Poller> poller) = 0; + virtual void createBuffers(uint32_t size = MaxBufferSize) = 0; virtual void queueReadBuffer(BufferBase* buff) = 0; virtual void unread(BufferBase* buff) = 0; virtual void queueWrite(BufferBase* buff) = 0; diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 8a485db72d..2e117a3fb7 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -33,15 +33,6 @@ namespace qpid { namespace sys { -// Buffer definition -struct Buff : public AsynchIO::BufferBase { - Buff() : - AsynchIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - struct ProtocolTimeoutTask : public sys::TimerTask { AsynchIOHandler& handler; std::string id; @@ -79,7 +70,7 @@ AsynchIOHandler::~AsynchIOHandler() { delete codec; } -void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) { +void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) { aio = a; // Start timer for this connection @@ -87,17 +78,14 @@ void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint timer.add(timeoutTimerTask); // Give connection some buffers to use - for (int i = 0; i < numBuffs; i++) { - aio->queueReadBuffer(new Buff); - } + aio->createBuffers(); } void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")"); AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); @@ -244,24 +232,24 @@ void AsynchIOHandler::idle(AsynchIO&){ return; } if (codec == 0) return; - try { - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; + if (!codec->canEncode()) { + return; + } + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (buff) { + try { size_t encoded=codec->encode(buff->bytes, buff->byteCount); buff->dataCount = encoded; aio->queueWrite(buff); + if (!codec->isClosed()) { + return; + } + } catch (const std::exception& e) { + QPID_LOG(error, e.what()); } - if (codec->isClosed()) { - readError = true; - aio->queueWriteClose(); - } - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); } + readError = true; + aio->queueWriteClose(); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index 307aad5b85..fd0bc140e5 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -61,7 +61,7 @@ class AsynchIOHandler : public OutputControl { public: QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f ); QPID_COMMON_EXTERN ~AsynchIOHandler(); - QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs); + QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime); QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; } diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp index 3b50527c0a..069e97758e 100644 --- a/cpp/src/qpid/sys/SslPlugin.cpp +++ b/cpp/src/qpid/sys/SslPlugin.cpp @@ -191,7 +191,7 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1), boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1)); - async->init(aio,timer, maxTime, 4); + async->init(aio,timer, maxTime); aio->start(poller); } @@ -247,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, brokerTimer, maxNegotiateTime, 4); + async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 551440f954..ed7cc3748d 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -166,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, brokerTimer, maxNegotiateTime, 4); + async->init(aio, brokerTimer, maxNegotiateTime); aio->start(poller); } diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp index dcc9d9181c..c23403c66d 100644 --- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -221,8 +221,8 @@ class PollerPrivate { } }; - static ReadablePipe alwaysReadable; - static int alwaysReadableFd; + ReadablePipe alwaysReadable; + int alwaysReadableFd; class InterruptHandle: public PollerHandle { std::queue<PollerHandle*> handles; @@ -290,6 +290,7 @@ class PollerPrivate { } PollerPrivate() : + alwaysReadableFd(alwaysReadable.getFD()), epollFd(::epoll_create(DefaultFds)), isShutdown(false) { QPID_POSIX_CHECK(epollFd); @@ -328,9 +329,6 @@ class PollerPrivate { } }; -PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable; -int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD(); - void Poller::registerHandle(PollerHandle& handle) { PollerHandlePrivate& eh = *handle.impl; ScopedLock<Mutex> l(eh.lock); diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 01ff8b6bfa..31355627cd 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -40,6 +40,7 @@ #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> +#include <boost/shared_array.hpp> namespace qpid { namespace sys { @@ -239,6 +240,7 @@ public: virtual void queueForDeletion(); virtual void start(Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); virtual void queueReadBuffer(BufferBase* buff); virtual void unread(BufferBase* buff); virtual void queueWrite(BufferBase* buff); @@ -270,6 +272,8 @@ private: const Socket& socket; std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; bool queuedClose; /** * This flag is used to detect and handle concurrency between @@ -309,15 +313,7 @@ AsynchIO::AsynchIO(const Socket& s, s.setNonblocking(); } -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - AsynchIO::~AsynchIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void AsynchIO::queueForDeletion() { @@ -328,6 +324,19 @@ void AsynchIO::start(Poller::shared_ptr poller) { DispatchHandle::startWatch(poller); } +void AsynchIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void AsynchIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp index 2b1bbb97df..cfd2c64aee 100755 --- a/cpp/src/qpid/sys/posix/SystemInfo.cpp +++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp @@ -91,7 +91,7 @@ void SystemInfo::getLocalIpAddresses (uint16_t port, // * The scope id is illegal in URL syntax // * Clients won't be able to use a link local address // without adding their own (potentially different) scope id - sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr); + sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr); if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break; // Fallthrough } diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp index 8613059f28..eeb8c26a76 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -33,15 +33,6 @@ namespace sys { namespace ssl { -// Buffer definition -struct Buff : public SslIO::BufferBase { - Buff() : - SslIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - struct ProtocolTimeoutTask : public sys::TimerTask { SslHandler& handler; std::string id; @@ -78,7 +69,7 @@ SslHandler::~SslHandler() { delete codec; } -void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { +void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) { aio = a; // Start timer for this connection @@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { timer.add(timeoutTimerTask); // Give connection some buffers to use - for (int i = 0; i < numBuffs; i++) { - aio->queueReadBuffer(new Buff); - } + aio->createBuffers(); } void SslHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")"); SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); @@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){ return; } if (codec == 0) return; - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; + if (!codec->canEncode()) { + return; + } + SslIO::BufferBase* buff = aio->getQueuedBuffer(); + if (buff) { size_t encoded=codec->encode(buff->bytes, buff->byteCount); buff->dataCount = encoded; aio->queueWrite(buff); diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h index 74df2b7fb0..14814b0281 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/cpp/src/qpid/sys/ssl/SslHandler.h @@ -60,7 +60,7 @@ class SslHandler : public OutputControl { public: SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict); ~SslHandler(); - void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs); + void init(SslIO* a, Timer& timer, uint32_t maxTime); void setClient() { isClient = true; } diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp index 789c205ead..bbfb703170 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s, s.setNonblocking(); } -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - SslIO::~SslIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void SslIO::queueForDeletion() { @@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr poller) { DispatchHandle::startWatch(poller); } +void SslIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void SslIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h index b795594cd9..f3112bfa65 100644 --- a/cpp/src/qpid/sys/ssl/SslIo.h +++ b/cpp/src/qpid/sys/ssl/SslIo.h @@ -25,6 +25,7 @@ #include "qpid/sys/SecuritySettings.h" #include <boost/function.hpp> +#include <boost/shared_array.hpp> #include <deque> namespace qpid { @@ -87,8 +88,8 @@ private: }; struct SslIOBufferBase { - char* const bytes; - const int32_t byteCount; + char* bytes; + int32_t byteCount; int32_t dataStart; int32_t dataCount; @@ -127,7 +128,9 @@ public: typedef boost::function1<void, SslIO&> IdleCallback; typedef boost::function1<void, SslIO&> RequestCallback; - + SslIO(const SslSocket& s, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); private: ReadCallback readCallback; EofCallback eofCallback; @@ -138,6 +141,8 @@ private: const SslSocket& socket; std::deque<BufferBase*> bufferQueue; std::deque<BufferBase*> writeQueue; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; bool queuedClose; /** * This flag is used to detect and handle concurrency between @@ -148,12 +153,21 @@ private: volatile bool writePending; public: - SslIO(const SslSocket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); + /* + * Size of IO buffers - this is the maximum possible frame size + 1 + */ + const static uint32_t MaxBufferSize = 65536; + + /* + * Number of IO buffers allocated - I think the code can only use 2 - + * 1 for reading and 1 for writing, allocate 4 for safety + */ + const static uint32_t BufferCount = 4; + void queueForDeletion(); void start(qpid::sys::Poller::shared_ptr poller); + void createBuffers(uint32_t size = MaxBufferSize); void queueReadBuffer(BufferBase* buff); void unread(BufferBase* buff); void queueWrite(BufferBase* buff); diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp index ae53414e52..355acbe0e6 100644 --- a/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -40,6 +40,7 @@ #include <windows.h> #include <boost/bind.hpp> +#include <boost/shared_array.hpp> namespace { @@ -252,6 +253,7 @@ public: /// Take any actions needed to prepare for working with the poller. virtual void start(Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); virtual void queueReadBuffer(BufferBase* buff); virtual void unread(BufferBase* buff); virtual void queueWrite(BufferBase* buff); @@ -286,6 +288,8 @@ private: * access to the buffer queue and write queue. */ Mutex bufferQueueLock; + std::vector<BufferBase> buffers; + boost::shared_array<char> bufferMemory; // Number of outstanding I/O operations. volatile LONG opsInProgress; @@ -385,15 +389,7 @@ AsynchIO::AsynchIO(const Socket& s, working(false) { } -struct deleter -{ - template <typename T> - void operator()(T *ptr){ delete ptr;} -}; - AsynchIO::~AsynchIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void AsynchIO::queueForDeletion() { @@ -426,6 +422,19 @@ void AsynchIO::start(Poller::shared_ptr poller0) { startReading(); } +void AsynchIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp index 25cc94b290..d263f00ab3 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -55,7 +55,7 @@ namespace { * the frame layer for writing into. */ struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase { - std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff; + qpid::sys::AsynchIO::BufferBase* aioBuff; SslIoBuff (qpid::sys::AsynchIO::BufferBase *base, const SecPkgContext_StreamSizes &sizes) @@ -66,7 +66,6 @@ namespace { {} ~SslIoBuff() {} - qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); } }; } @@ -101,10 +100,7 @@ SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s, } SslAsynchIO::~SslAsynchIO() { - if (leftoverPlaintext) { - delete leftoverPlaintext; - leftoverPlaintext = 0; - } + leftoverPlaintext = 0; } void SslAsynchIO::queueForDeletion() { @@ -121,6 +117,10 @@ void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) { startNegotiate(); } +void SslAsynchIO::createBuffers(uint32_t size) { + aio->createBuffers(size); +} + void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) { aio->queueReadBuffer(buff); } @@ -148,7 +148,7 @@ void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) { // encoding was working on, and adjusting counts for, the SslIoBuff. // Update the count of the original BufferBase before handing off to // the I/O layer. - buff = sslBuff->release(); + buff = sslBuff->aioBuff; SecBuffer buffs[4]; buffs[0].cbBuffer = schSizes.cbHeader; buffs[0].BufferType = SECBUFFER_STREAM_HEADER; diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h index edec081ced..e9d9e8d629 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -70,6 +70,7 @@ public: virtual void queueForDeletion(); virtual void start(qpid::sys::Poller::shared_ptr poller); + virtual void createBuffers(uint32_t size); virtual void queueReadBuffer(BufferBase* buff); virtual void unread(BufferBase* buff); virtual void queueWrite(BufferBase* buff); diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp index 25c50819cd..700a25391f 100644 --- a/cpp/src/qpid/sys/windows/Time.cpp +++ b/cpp/src/qpid/sys/windows/Time.cpp @@ -20,10 +20,12 @@ */ #include "qpid/sys/Time.h" +#include <cmath> #include <ostream> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread/thread_time.hpp> #include <windows.h> +#include <time.h> using namespace boost::posix_time; @@ -33,8 +35,16 @@ namespace { // more or less. Keep track of the start value and the conversion factor to // seconds. bool timeInitialized = false; -LARGE_INTEGER start; -double freq = 1.0; +LARGE_INTEGER start_hpc; +double hpc_freq = 1.0; + +double start_time; + +/// Static constant to remove time skew between FILETIME and POSIX +/// time. POSIX and Win32 use different epochs (Jan. 1, 1970 v.s. +/// Jan. 1, 1601). The following constant defines the difference +/// in 100ns ticks. +const DWORDLONG FILETIME_to_timval_skew = 0x19db1ded53e8000; } @@ -114,23 +124,59 @@ void outputFormattedNow(std::ostream& o) { } void outputHiresNow(std::ostream& o) { + ::time_t tv_sec; + ::tm timeinfo; + char time_string[100]; + if (!timeInitialized) { - start.QuadPart = 0; + // To start, get the current time from FILETIME which includes + // sub-second resolution. However, since FILETIME is updated a bit + // "bumpy" every 15 msec or so, future time displays will be the + // starting FILETIME plus a delta based on the high-resolution + // performance counter. + FILETIME file_time; + ULARGE_INTEGER start_usec; + ::GetSystemTimeAsFileTime(&file_time); // This is in 100ns units + start_usec.LowPart = file_time.dwLowDateTime; + start_usec.HighPart = file_time.dwHighDateTime; + start_usec.QuadPart -= FILETIME_to_timval_skew; + start_usec.QuadPart /= 10; // Convert 100ns to usec + tv_sec = (time_t)(start_usec.QuadPart / (1000 * 1000)); + long tv_usec = (long)(start_usec.QuadPart % (1000 * 1000)); + start_time = static_cast<double>(tv_sec); + start_time += tv_usec / 1000000.0; + + start_hpc.QuadPart = 0; LARGE_INTEGER iFreq; iFreq.QuadPart = 1; - QueryPerformanceCounter(&start); + QueryPerformanceCounter(&start_hpc); QueryPerformanceFrequency(&iFreq); - freq = static_cast<double>(iFreq.QuadPart); + hpc_freq = static_cast<double>(iFreq.QuadPart); timeInitialized = true; } - LARGE_INTEGER iNow; - iNow.QuadPart = 0; - QueryPerformanceCounter(&iNow); - iNow.QuadPart -= start.QuadPart; - if (iNow.QuadPart < 0) - iNow.QuadPart = 0; - double now = static_cast<double>(iNow.QuadPart); - now /= freq; // now is seconds after this - o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s "; + LARGE_INTEGER hpc_now; + hpc_now.QuadPart = 0; + QueryPerformanceCounter(&hpc_now); + hpc_now.QuadPart -= start_hpc.QuadPart; + if (hpc_now.QuadPart < 0) + hpc_now.QuadPart = 0; + double now = static_cast<double>(hpc_now.QuadPart); + now /= hpc_freq; // now is seconds after this + double fnow = start_time + now; + double usec, sec; + usec = modf(fnow, &sec); + tv_sec = static_cast<time_t>(sec); +#ifdef _MSC_VER + ::localtime_s(&timeinfo, &tv_sec); +#else + timeinfo = *(::localtime(&tv_sec)); +#endif + ::strftime(time_string, 100, + "%Y-%m-%d %H:%M:%S", + &timeinfo); + // No way to set "max field width" to cleanly output the double usec so + // convert it back to integral number of usecs and print that. + unsigned long i_usec = usec * 1000 * 1000; + o << time_string << "." << std::setw(6) << std::setfill('0') << i_usec << " "; } }} |