summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-27 15:40:33 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-27 15:40:33 +0000
commit868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch)
tree63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/sys
parent2e5ff8f1b328831043e6d7e323249d62187234c6 (diff)
downloadqpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIO.h16
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp44
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h2
-rw-r--r--cpp/src/qpid/sys/SslPlugin.cpp4
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp2
-rw-r--r--cpp/src/qpid/sys/epoll/EpollPoller.cpp8
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp25
-rwxr-xr-xcpp/src/qpid/sys/posix/SystemInfo.cpp2
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.cpp27
-rw-r--r--cpp/src/qpid/sys/ssl/SslHandler.h2
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.cpp21
-rw-r--r--cpp/src/qpid/sys/ssl/SslIo.h26
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp25
-rw-r--r--cpp/src/qpid/sys/windows/SslAsynchIO.cpp14
-rw-r--r--cpp/src/qpid/sys/windows/SslAsynchIO.h1
-rw-r--r--cpp/src/qpid/sys/windows/Time.cpp74
16 files changed, 182 insertions, 111 deletions
diff --git a/cpp/src/qpid/sys/AsynchIO.h b/cpp/src/qpid/sys/AsynchIO.h
index 41f74f7ed0..b2eaaac9de 100644
--- a/cpp/src/qpid/sys/AsynchIO.h
+++ b/cpp/src/qpid/sys/AsynchIO.h
@@ -76,8 +76,8 @@ protected:
};
struct AsynchIOBufferBase {
- char* const bytes;
- const int32_t byteCount;
+ char* bytes;
+ int32_t byteCount;
int32_t dataStart;
int32_t dataCount;
@@ -134,9 +134,21 @@ public:
BuffersEmptyCallback eCb = 0,
IdleCallback iCb = 0);
public:
+ /*
+ * Size of IO buffers - this is the maximum possible frame size + 1
+ */
+ const static uint32_t MaxBufferSize = 65536;
+
+ /*
+ * Number of IO buffers allocated - I think the code can only use 2 -
+ * 1 for reading and 1 for writing, allocate 4 for safety
+ */
+ const static uint32_t BufferCount = 4;
+
virtual void queueForDeletion() = 0;
virtual void start(boost::shared_ptr<Poller> poller) = 0;
+ virtual void createBuffers(uint32_t size = MaxBufferSize) = 0;
virtual void queueReadBuffer(BufferBase* buff) = 0;
virtual void unread(BufferBase* buff) = 0;
virtual void queueWrite(BufferBase* buff) = 0;
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 8a485db72d..2e117a3fb7 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -33,15 +33,6 @@
namespace qpid {
namespace sys {
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
- Buff() :
- AsynchIO::BufferBase(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
-};
-
struct ProtocolTimeoutTask : public sys::TimerTask {
AsynchIOHandler& handler;
std::string id;
@@ -79,7 +70,7 @@ AsynchIOHandler::~AsynchIOHandler() {
delete codec;
}
-void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) {
aio = a;
// Start timer for this connection
@@ -87,17 +78,14 @@ void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint
timer.add(timeoutTimerTask);
// Give connection some buffers to use
- for (int i = 0; i < numBuffs; i++) {
- aio->queueReadBuffer(new Buff);
- }
+ aio->createBuffers();
}
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
@@ -244,24 +232,24 @@ void AsynchIOHandler::idle(AsynchIO&){
return;
}
if (codec == 0) return;
- try {
- if (codec->canEncode()) {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff) buff = new Buff;
+ if (!codec->canEncode()) {
+ return;
+ }
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (buff) {
+ try {
size_t encoded=codec->encode(buff->bytes, buff->byteCount);
buff->dataCount = encoded;
aio->queueWrite(buff);
+ if (!codec->isClosed()) {
+ return;
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
}
- if (codec->isClosed()) {
- readError = true;
- aio->queueWriteClose();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
}
+ readError = true;
+ aio->queueWriteClose();
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index 307aad5b85..fd0bc140e5 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -61,7 +61,7 @@ class AsynchIOHandler : public OutputControl {
public:
QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
QPID_COMMON_EXTERN ~AsynchIOHandler();
- QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+ QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
diff --git a/cpp/src/qpid/sys/SslPlugin.cpp b/cpp/src/qpid/sys/SslPlugin.cpp
index 3b50527c0a..069e97758e 100644
--- a/cpp/src/qpid/sys/SslPlugin.cpp
+++ b/cpp/src/qpid/sys/SslPlugin.cpp
@@ -191,7 +191,7 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
- async->init(aio,timer, maxTime, 4);
+ async->init(aio,timer, maxTime);
aio->start(poller);
}
@@ -247,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket&
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, brokerTimer, maxNegotiateTime, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 551440f954..ed7cc3748d 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -166,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, brokerTimer, maxNegotiateTime, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
diff --git a/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index dcc9d9181c..c23403c66d 100644
--- a/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -221,8 +221,8 @@ class PollerPrivate {
}
};
- static ReadablePipe alwaysReadable;
- static int alwaysReadableFd;
+ ReadablePipe alwaysReadable;
+ int alwaysReadableFd;
class InterruptHandle: public PollerHandle {
std::queue<PollerHandle*> handles;
@@ -290,6 +290,7 @@ class PollerPrivate {
}
PollerPrivate() :
+ alwaysReadableFd(alwaysReadable.getFD()),
epollFd(::epoll_create(DefaultFds)),
isShutdown(false) {
QPID_POSIX_CHECK(epollFd);
@@ -328,9 +329,6 @@ class PollerPrivate {
}
};
-PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
-int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
-
void Poller::registerHandle(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 01ff8b6bfa..31355627cd 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -40,6 +40,7 @@
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
+#include <boost/shared_array.hpp>
namespace qpid {
namespace sys {
@@ -239,6 +240,7 @@ public:
virtual void queueForDeletion();
virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
@@ -270,6 +272,8 @@ private:
const Socket& socket;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
bool queuedClose;
/**
* This flag is used to detect and handle concurrency between
@@ -309,15 +313,7 @@ AsynchIO::AsynchIO(const Socket& s,
s.setNonblocking();
}
-struct deleter
-{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
-};
-
AsynchIO::~AsynchIO() {
- std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
- std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void AsynchIO::queueForDeletion() {
@@ -328,6 +324,19 @@ void AsynchIO::start(Poller::shared_ptr poller) {
DispatchHandle::startWatch(poller);
}
+void AsynchIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
void AsynchIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
diff --git a/cpp/src/qpid/sys/posix/SystemInfo.cpp b/cpp/src/qpid/sys/posix/SystemInfo.cpp
index 2b1bbb97df..cfd2c64aee 100755
--- a/cpp/src/qpid/sys/posix/SystemInfo.cpp
+++ b/cpp/src/qpid/sys/posix/SystemInfo.cpp
@@ -91,7 +91,7 @@ void SystemInfo::getLocalIpAddresses (uint16_t port,
// * The scope id is illegal in URL syntax
// * Clients won't be able to use a link local address
// without adding their own (potentially different) scope id
- sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr);
+ sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr);
if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
// Fallthrough
}
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp
index 8613059f28..eeb8c26a76 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.cpp
+++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp
@@ -33,15 +33,6 @@ namespace sys {
namespace ssl {
-// Buffer definition
-struct Buff : public SslIO::BufferBase {
- Buff() :
- SslIO::BufferBase(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
-};
-
struct ProtocolTimeoutTask : public sys::TimerTask {
SslHandler& handler;
std::string id;
@@ -78,7 +69,7 @@ SslHandler::~SslHandler() {
delete codec;
}
-void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) {
aio = a;
// Start timer for this connection
@@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
timer.add(timeoutTimerTask);
// Give connection some buffers to use
- for (int i = 0; i < numBuffs; i++) {
- aio->queueReadBuffer(new Buff);
- }
+ aio->createBuffers();
}
void SslHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
SslIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
@@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){
return;
}
if (codec == 0) return;
- if (codec->canEncode()) {
- // Try and get a queued buffer if not then construct new one
- SslIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff) buff = new Buff;
+ if (!codec->canEncode()) {
+ return;
+ }
+ SslIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (buff) {
size_t encoded=codec->encode(buff->bytes, buff->byteCount);
buff->dataCount = encoded;
aio->queueWrite(buff);
diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h
index 74df2b7fb0..14814b0281 100644
--- a/cpp/src/qpid/sys/ssl/SslHandler.h
+++ b/cpp/src/qpid/sys/ssl/SslHandler.h
@@ -60,7 +60,7 @@ class SslHandler : public OutputControl {
public:
SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
~SslHandler();
- void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+ void init(SslIO* a, Timer& timer, uint32_t maxTime);
void setClient() { isClient = true; }
diff --git a/cpp/src/qpid/sys/ssl/SslIo.cpp b/cpp/src/qpid/sys/ssl/SslIo.cpp
index 789c205ead..bbfb703170 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.cpp
+++ b/cpp/src/qpid/sys/ssl/SslIo.cpp
@@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s,
s.setNonblocking();
}
-struct deleter
-{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
-};
-
SslIO::~SslIO() {
- std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
- std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void SslIO::queueForDeletion() {
@@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr poller) {
DispatchHandle::startWatch(poller);
}
+void SslIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
void SslIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
diff --git a/cpp/src/qpid/sys/ssl/SslIo.h b/cpp/src/qpid/sys/ssl/SslIo.h
index b795594cd9..f3112bfa65 100644
--- a/cpp/src/qpid/sys/ssl/SslIo.h
+++ b/cpp/src/qpid/sys/ssl/SslIo.h
@@ -25,6 +25,7 @@
#include "qpid/sys/SecuritySettings.h"
#include <boost/function.hpp>
+#include <boost/shared_array.hpp>
#include <deque>
namespace qpid {
@@ -87,8 +88,8 @@ private:
};
struct SslIOBufferBase {
- char* const bytes;
- const int32_t byteCount;
+ char* bytes;
+ int32_t byteCount;
int32_t dataStart;
int32_t dataCount;
@@ -127,7 +128,9 @@ public:
typedef boost::function1<void, SslIO&> IdleCallback;
typedef boost::function1<void, SslIO&> RequestCallback;
-
+ SslIO(const SslSocket& s,
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
private:
ReadCallback readCallback;
EofCallback eofCallback;
@@ -138,6 +141,8 @@ private:
const SslSocket& socket;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
bool queuedClose;
/**
* This flag is used to detect and handle concurrency between
@@ -148,12 +153,21 @@ private:
volatile bool writePending;
public:
- SslIO(const SslSocket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ /*
+ * Size of IO buffers - this is the maximum possible frame size + 1
+ */
+ const static uint32_t MaxBufferSize = 65536;
+
+ /*
+ * Number of IO buffers allocated - I think the code can only use 2 -
+ * 1 for reading and 1 for writing, allocate 4 for safety
+ */
+ const static uint32_t BufferCount = 4;
+
void queueForDeletion();
void start(qpid::sys::Poller::shared_ptr poller);
+ void createBuffers(uint32_t size = MaxBufferSize);
void queueReadBuffer(BufferBase* buff);
void unread(BufferBase* buff);
void queueWrite(BufferBase* buff);
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index ae53414e52..355acbe0e6 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -40,6 +40,7 @@
#include <windows.h>
#include <boost/bind.hpp>
+#include <boost/shared_array.hpp>
namespace {
@@ -252,6 +253,7 @@ public:
/// Take any actions needed to prepare for working with the poller.
virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
@@ -286,6 +288,8 @@ private:
* access to the buffer queue and write queue.
*/
Mutex bufferQueueLock;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
// Number of outstanding I/O operations.
volatile LONG opsInProgress;
@@ -385,15 +389,7 @@ AsynchIO::AsynchIO(const Socket& s,
working(false) {
}
-struct deleter
-{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
-};
-
AsynchIO::~AsynchIO() {
- std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
- std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void AsynchIO::queueForDeletion() {
@@ -426,6 +422,19 @@ void AsynchIO::start(Poller::shared_ptr poller0) {
startReading();
}
+void AsynchIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
index 25cc94b290..d263f00ab3 100644
--- a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -55,7 +55,7 @@ namespace {
* the frame layer for writing into.
*/
struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase {
- std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff;
+ qpid::sys::AsynchIO::BufferBase* aioBuff;
SslIoBuff (qpid::sys::AsynchIO::BufferBase *base,
const SecPkgContext_StreamSizes &sizes)
@@ -66,7 +66,6 @@ namespace {
{}
~SslIoBuff() {}
- qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); }
};
}
@@ -101,10 +100,7 @@ SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s,
}
SslAsynchIO::~SslAsynchIO() {
- if (leftoverPlaintext) {
- delete leftoverPlaintext;
- leftoverPlaintext = 0;
- }
+ leftoverPlaintext = 0;
}
void SslAsynchIO::queueForDeletion() {
@@ -121,6 +117,10 @@ void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) {
startNegotiate();
}
+void SslAsynchIO::createBuffers(uint32_t size) {
+ aio->createBuffers(size);
+}
+
void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
aio->queueReadBuffer(buff);
}
@@ -148,7 +148,7 @@ void SslAsynchIO::queueWrite(AsynchIO::BufferBase* buff) {
// encoding was working on, and adjusting counts for, the SslIoBuff.
// Update the count of the original BufferBase before handing off to
// the I/O layer.
- buff = sslBuff->release();
+ buff = sslBuff->aioBuff;
SecBuffer buffs[4];
buffs[0].cbBuffer = schSizes.cbHeader;
buffs[0].BufferType = SECBUFFER_STREAM_HEADER;
diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h
index edec081ced..e9d9e8d629 100644
--- a/cpp/src/qpid/sys/windows/SslAsynchIO.h
+++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h
@@ -70,6 +70,7 @@ public:
virtual void queueForDeletion();
virtual void start(qpid::sys::Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp
index 25c50819cd..700a25391f 100644
--- a/cpp/src/qpid/sys/windows/Time.cpp
+++ b/cpp/src/qpid/sys/windows/Time.cpp
@@ -20,10 +20,12 @@
*/
#include "qpid/sys/Time.h"
+#include <cmath>
#include <ostream>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread_time.hpp>
#include <windows.h>
+#include <time.h>
using namespace boost::posix_time;
@@ -33,8 +35,16 @@ namespace {
// more or less. Keep track of the start value and the conversion factor to
// seconds.
bool timeInitialized = false;
-LARGE_INTEGER start;
-double freq = 1.0;
+LARGE_INTEGER start_hpc;
+double hpc_freq = 1.0;
+
+double start_time;
+
+/// Static constant to remove time skew between FILETIME and POSIX
+/// time. POSIX and Win32 use different epochs (Jan. 1, 1970 v.s.
+/// Jan. 1, 1601). The following constant defines the difference
+/// in 100ns ticks.
+const DWORDLONG FILETIME_to_timval_skew = 0x19db1ded53e8000;
}
@@ -114,23 +124,59 @@ void outputFormattedNow(std::ostream& o) {
}
void outputHiresNow(std::ostream& o) {
+ ::time_t tv_sec;
+ ::tm timeinfo;
+ char time_string[100];
+
if (!timeInitialized) {
- start.QuadPart = 0;
+ // To start, get the current time from FILETIME which includes
+ // sub-second resolution. However, since FILETIME is updated a bit
+ // "bumpy" every 15 msec or so, future time displays will be the
+ // starting FILETIME plus a delta based on the high-resolution
+ // performance counter.
+ FILETIME file_time;
+ ULARGE_INTEGER start_usec;
+ ::GetSystemTimeAsFileTime(&file_time); // This is in 100ns units
+ start_usec.LowPart = file_time.dwLowDateTime;
+ start_usec.HighPart = file_time.dwHighDateTime;
+ start_usec.QuadPart -= FILETIME_to_timval_skew;
+ start_usec.QuadPart /= 10; // Convert 100ns to usec
+ tv_sec = (time_t)(start_usec.QuadPart / (1000 * 1000));
+ long tv_usec = (long)(start_usec.QuadPart % (1000 * 1000));
+ start_time = static_cast<double>(tv_sec);
+ start_time += tv_usec / 1000000.0;
+
+ start_hpc.QuadPart = 0;
LARGE_INTEGER iFreq;
iFreq.QuadPart = 1;
- QueryPerformanceCounter(&start);
+ QueryPerformanceCounter(&start_hpc);
QueryPerformanceFrequency(&iFreq);
- freq = static_cast<double>(iFreq.QuadPart);
+ hpc_freq = static_cast<double>(iFreq.QuadPart);
timeInitialized = true;
}
- LARGE_INTEGER iNow;
- iNow.QuadPart = 0;
- QueryPerformanceCounter(&iNow);
- iNow.QuadPart -= start.QuadPart;
- if (iNow.QuadPart < 0)
- iNow.QuadPart = 0;
- double now = static_cast<double>(iNow.QuadPart);
- now /= freq; // now is seconds after this
- o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+ LARGE_INTEGER hpc_now;
+ hpc_now.QuadPart = 0;
+ QueryPerformanceCounter(&hpc_now);
+ hpc_now.QuadPart -= start_hpc.QuadPart;
+ if (hpc_now.QuadPart < 0)
+ hpc_now.QuadPart = 0;
+ double now = static_cast<double>(hpc_now.QuadPart);
+ now /= hpc_freq; // now is seconds after this
+ double fnow = start_time + now;
+ double usec, sec;
+ usec = modf(fnow, &sec);
+ tv_sec = static_cast<time_t>(sec);
+#ifdef _MSC_VER
+ ::localtime_s(&timeinfo, &tv_sec);
+#else
+ timeinfo = *(::localtime(&tv_sec));
+#endif
+ ::strftime(time_string, 100,
+ "%Y-%m-%d %H:%M:%S",
+ &timeinfo);
+ // No way to set "max field width" to cleanly output the double usec so
+ // convert it back to integral number of usecs and print that.
+ unsigned long i_usec = usec * 1000 * 1000;
+ o << time_string << "." << std::setw(6) << std::setfill('0') << i_usec << " ";
}
}}