From 097470826d8089c37ba5fd13f67badd84aa6c549 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 10 Aug 2012 17:27:38 +0000 Subject: SSL changes for new buffer management - Needed to rework SslConnector to mirror TCPConnector in order to make changes to the client side, but now Unix SSL and TCP client implementations are much more alike. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1371775 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/SslConnector.cpp | 224 ++++++++++++++---------------- qpid/cpp/src/qpid/sys/SslPlugin.cpp | 2 +- qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp | 27 ++-- qpid/cpp/src/qpid/sys/ssl/SslHandler.h | 2 +- qpid/cpp/src/qpid/sys/ssl/SslIo.cpp | 21 +-- qpid/cpp/src/qpid/sys/ssl/SslIo.h | 26 +++- 6 files changed, 149 insertions(+), 153 deletions(-) diff --git a/qpid/cpp/src/qpid/client/SslConnector.cpp b/qpid/cpp/src/qpid/client/SslConnector.cpp index 4c6fadd28a..c2081a88f2 100644 --- a/qpid/cpp/src/qpid/client/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/SslConnector.cpp @@ -38,7 +38,6 @@ #include "qpid/Msg.h" #include -#include #include #include @@ -54,53 +53,29 @@ using boost::str; class SslConnector : public Connector { - struct Buff; - - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::ssl::SslIOBufferBase BufferBase; - typedef std::vector Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::ssl::SslIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); - - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::ssl::SslIO*); - void handle(framing::AMQFrame&); - void write(sys::ssl::SslIO&); - }; + typedef std::deque Frames; const uint16_t maxFrameSize; + + sys::Mutex lock; + Frames frames; + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + framing::ProtocolVersion version; bool initiated; - SecuritySettings securitySettings; - - sys::Mutex closedLock; bool closed; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - Writer writer; - sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; + std::string identifier; Poller::shared_ptr poller; + SecuritySettings securitySettings; ~SslConnector(); @@ -110,10 +85,7 @@ class SslConnector : public Connector void eof(qpid::sys::ssl::SslIO&); void disconnected(qpid::sys::ssl::SslIO&); - std::string identifier; - void connect(const std::string& host, const std::string& port); - void init(); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: Need to fix for heartbeat timeouts to work @@ -126,17 +98,16 @@ class SslConnector : public Connector const SecuritySettings* getSecuritySettings(); void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + public: SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { @@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::shared_ptr p, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), closed(true), shutdownHandler(0), input(0), - writer(maxFrameSize, cimpl), aio(0), poller(p) { @@ -192,7 +165,7 @@ SslConnector::~SslConnector() { } void SslConnector::connect(const std::string& host, const std::string& port){ - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); assert(closed); try { socket.connect(host, port); @@ -201,7 +174,6 @@ void SslConnector::connect(const std::string& host, const std::string& port){ throw TransportFailure(e.what()); } - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), @@ -210,21 +182,16 @@ void SslConnector::connect(const std::string& host, const std::string& port){ boost::bind(&SslConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::writebuff, this, _1)); - writer.init(identifier, aio); -} -void SslConnector::init(){ - Mutex::ScopedLock l(closedLock); + aio->createBuffers(maxFrameSize); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } aio->start(poller); } void SslConnector::close() { - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); if (!closed) { closed = true; if (aio) @@ -260,76 +227,110 @@ const std::string& SslConnector::getIdentifier() const { } void SslConnector::send(AMQFrame& frame) { - writer.handle(frame); -} - -SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) -{ -} - -SslConnector::Writer::~Writer() { delete buffer; } - -void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - newBuffer(); -} -void SslConnector::Writer::handle(framing::AMQFrame& frame) { + bool notifyWrite = false; + { Mutex::ScopedLock l(lock); frames.push_back(frame); - if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) { + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { lastEof = frames.size(); - aio->notifyPendingWrite(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + /* + NOTE: Moving the following line into this mutex block + is a workaround for BZ 570168, in which the test + testConcurrentSenders causes a hang about 1.5% + of the time. ( To see the hang much more frequently + leave this line out of the mutex block, and put a + small usleep just before it.) + + TODO mgoulish - fix the underlying cause and then + move this call back outside the mutex. + */ + if (notifyWrite && !closed) aio->notifyPendingWrite(); } - QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); } -void SslConnector::Writer::writeOne() { - assert(buffer); - framesEncoded = 0; +void SslConnector::writebuff(SslIO& /*aio*/) +{ + // It's possible to be disconnected and be writable + if (closed) + return; - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); -} + if (!canEncode()) { + return; + } -void SslConnector::Writer::newBuffer() { - buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(maxFrameSize); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; + SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + + size_t encoded = encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } } // Called in IO thread. -void SslConnector::Writer::write(sys::ssl::SslIO&) { +bool SslConnector::canEncode() +{ Mutex::ScopedLock l(lock); - assert(buffer); + //have at least one full frameset or a whole buffers worth of data + return lastEof || currentSize >= maxFrameSize; +} + +// Called in IO thread. +size_t SslConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast(buffer), size); size_t bytesWritten(0); - for (size_t i = 0; i < lastEof; ++i) { - AMQFrame& frame = frames[i]; - uint32_t size = frame.encodedSize(); - if (size > encode.available()) writeOne(); - assert(size <= encode.available()); - frame.encode(encode); - ++framesEncoded; - bytesWritten += size; + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } - frames.erase(frames.begin(), frames.begin()+lastEof); - lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(); + return bytesWritten; } -void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) +{ + int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount); + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded < buff->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} +size_t SslConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { - //TODO: check the version is correct QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); + if(!(protocolInit==version)){ + throw Exception(QPID_MSG("Unsupported version: " << protocolInit + << " supported version " << version)); + } } initiated = true; } @@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); input->received(frame); } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } -} - -void SslConnector::writebuff(SslIO& aio_) { - writer.write(aio_); + return size - in.available(); } void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = new Buff(maxFrameSize); + SslIO::BufferBase* buff = aio->getQueuedBuffer(); + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index c2a3d74cbd..069e97758e 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -191,7 +191,7 @@ void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s, boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1), boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1)); - async->init(aio,timer, maxTime, 4); + async->init(aio,timer, maxTime); aio->start(poller); } diff --git a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp b/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp index 8613059f28..eeb8c26a76 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -33,15 +33,6 @@ namespace sys { namespace ssl { -// Buffer definition -struct Buff : public SslIO::BufferBase { - Buff() : - SslIO::BufferBase(new char[65536], 65536) - {} - ~Buff() - { delete [] bytes;} -}; - struct ProtocolTimeoutTask : public sys::TimerTask { SslHandler& handler; std::string id; @@ -78,7 +69,7 @@ SslHandler::~SslHandler() { delete codec; } -void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { +void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) { aio = a; // Start timer for this connection @@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) { timer.add(timeoutTimerTask); // Give connection some buffers to use - for (int i = 0; i < numBuffs; i++) { - aio->queueReadBuffer(new Buff); - } + aio->createBuffers(); } void SslHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")"); SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); @@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){ return; } if (codec == 0) return; - if (codec->canEncode()) { - // Try and get a queued buffer if not then construct new one - SslIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) buff = new Buff; + if (!codec->canEncode()) { + return; + } + SslIO::BufferBase* buff = aio->getQueuedBuffer(); + if (buff) { size_t encoded=codec->encode(buff->bytes, buff->byteCount); buff->dataCount = encoded; aio->queueWrite(buff); diff --git a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h b/qpid/cpp/src/qpid/sys/ssl/SslHandler.h index 74df2b7fb0..14814b0281 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslHandler.h @@ -60,7 +60,7 @@ class SslHandler : public OutputControl { public: SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict); ~SslHandler(); - void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs); + void init(SslIO* a, Timer& timer, uint32_t maxTime); void setClient() { isClient = true; } diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp index 789c205ead..bbfb703170 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s, s.setNonblocking(); } -struct deleter -{ - template - void operator()(T *ptr){ delete ptr;} -}; - SslIO::~SslIO() { - std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); - std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); } void SslIO::queueForDeletion() { @@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr poller) { DispatchHandle::startWatch(poller); } +void SslIO::createBuffers(uint32_t size) { + // Allocate all the buffer memory at once + bufferMemory.reset(new char[size*BufferCount]); + + // Create the Buffer structs in a vector + // And push into the buffer queue + buffers.reserve(BufferCount); + for (uint32_t i = 0; i < BufferCount; i++) { + buffers.push_back(BufferBase(&bufferMemory[i*size], size)); + queueReadBuffer(&buffers[i]); + } +} + void SslIO::queueReadBuffer(BufferBase* buff) { assert(buff); buff->dataStart = 0; diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.h b/qpid/cpp/src/qpid/sys/ssl/SslIo.h index b795594cd9..f3112bfa65 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.h +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.h @@ -25,6 +25,7 @@ #include "qpid/sys/SecuritySettings.h" #include +#include #include namespace qpid { @@ -87,8 +88,8 @@ private: }; struct SslIOBufferBase { - char* const bytes; - const int32_t byteCount; + char* bytes; + int32_t byteCount; int32_t dataStart; int32_t dataCount; @@ -127,7 +128,9 @@ public: typedef boost::function1 IdleCallback; typedef boost::function1 RequestCallback; - + SslIO(const SslSocket& s, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); private: ReadCallback readCallback; EofCallback eofCallback; @@ -138,6 +141,8 @@ private: const SslSocket& socket; std::deque bufferQueue; std::deque writeQueue; + std::vector buffers; + boost::shared_array bufferMemory; bool queuedClose; /** * This flag is used to detect and handle concurrency between @@ -148,12 +153,21 @@ private: volatile bool writePending; public: - SslIO(const SslSocket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0); + /* + * Size of IO buffers - this is the maximum possible frame size + 1 + */ + const static uint32_t MaxBufferSize = 65536; + + /* + * Number of IO buffers allocated - I think the code can only use 2 - + * 1 for reading and 1 for writing, allocate 4 for safety + */ + const static uint32_t BufferCount = 4; + void queueForDeletion(); void start(qpid::sys::Poller::shared_ptr poller); + void createBuffers(uint32_t size = MaxBufferSize); void queueReadBuffer(BufferBase* buff); void unread(BufferBase* buff); void queueWrite(BufferBase* buff); -- cgit v1.2.1