diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2012-08-10 17:27:38 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2012-08-10 17:27:38 +0000 |
| commit | c3ca030e5e9460f3b1dd7f23cc810fad6fb5eac1 (patch) | |
| tree | 0d99aff7c271ec2d0ab64cf938d2c0f2c7f13b0c /cpp/src/qpid/client/SslConnector.cpp | |
| parent | a1947442559323f1eb5e8eb4c67983d69ac2c65a (diff) | |
| download | qpid-python-c3ca030e5e9460f3b1dd7f23cc810fad6fb5eac1.tar.gz | |
SSL changes for new buffer management
- Needed to rework SslConnector to mirror
TCPConnector in order to make changes to
the client side, but now Unix SSL and TCP
client implementations are much more alike.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1371775 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 224 |
1 files changed, 106 insertions, 118 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 4c6fadd28a..c2081a88f2 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -38,7 +38,6 @@ #include "qpid/Msg.h" #include <iostream> -#include <map> #include <boost/bind.hpp> #include <boost/format.hpp> @@ -54,53 +53,29 @@ using boost::str; class SslConnector : public Connector { - struct Buff; - - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::ssl::SslIOBufferBase BufferBase; - typedef std::vector<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::ssl::SslIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); - - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::ssl::SslIO*); - void handle(framing::AMQFrame&); - void write(sys::ssl::SslIO&); - }; + typedef std::deque<framing::AMQFrame> Frames; const uint16_t maxFrameSize; + + sys::Mutex lock; + Frames frames; + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + framing::ProtocolVersion version; bool initiated; - SecuritySettings securitySettings; - - sys::Mutex closedLock; bool closed; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - Writer writer; - sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; + std::string identifier; Poller::shared_ptr poller; + SecuritySettings securitySettings; ~SslConnector(); @@ -110,10 +85,7 @@ class SslConnector : public Connector void eof(qpid::sys::ssl::SslIO&); void disconnected(qpid::sys::ssl::SslIO&); - std::string identifier; - void connect(const std::string& host, const std::string& port); - void init(); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: Need to fix for heartbeat timeouts to work @@ -126,17 +98,16 @@ class SslConnector : public Connector const SecuritySettings* getSecuritySettings(); void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + public: SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { @@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::shared_ptr p, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), closed(true), shutdownHandler(0), input(0), - writer(maxFrameSize, cimpl), aio(0), poller(p) { @@ -192,7 +165,7 @@ SslConnector::~SslConnector() { } void SslConnector::connect(const std::string& host, const std::string& port){ - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); assert(closed); try { socket.connect(host, port); @@ -201,7 +174,6 @@ void SslConnector::connect(const std::string& host, const std::string& port){ throw TransportFailure(e.what()); } - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), @@ -210,21 +182,16 @@ void SslConnector::connect(const std::string& host, const std::string& port){ boost::bind(&SslConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::writebuff, this, _1)); - writer.init(identifier, aio); -} -void SslConnector::init(){ - Mutex::ScopedLock l(closedLock); + aio->createBuffers(maxFrameSize); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } aio->start(poller); } void SslConnector::close() { - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); if (!closed) { closed = true; if (aio) @@ -260,76 +227,110 @@ const std::string& SslConnector::getIdentifier() const { } void SslConnector::send(AMQFrame& frame) { - writer.handle(frame); -} - -SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) -{ -} - -SslConnector::Writer::~Writer() { delete buffer; } - -void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - newBuffer(); -} -void SslConnector::Writer::handle(framing::AMQFrame& frame) { + bool notifyWrite = false; + { Mutex::ScopedLock l(lock); frames.push_back(frame); - if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) { + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { lastEof = frames.size(); - aio->notifyPendingWrite(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + /* + NOTE: Moving the following line into this mutex block + is a workaround for BZ 570168, in which the test + testConcurrentSenders causes a hang about 1.5% + of the time. ( To see the hang much more frequently + leave this line out of the mutex block, and put a + small usleep just before it.) + + TODO mgoulish - fix the underlying cause and then + move this call back outside the mutex. + */ + if (notifyWrite && !closed) aio->notifyPendingWrite(); } - QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); } -void SslConnector::Writer::writeOne() { - assert(buffer); - framesEncoded = 0; +void SslConnector::writebuff(SslIO& /*aio*/) +{ + // It's possible to be disconnected and be writable + if (closed) + return; - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); -} + if (!canEncode()) { + return; + } -void SslConnector::Writer::newBuffer() { - buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(maxFrameSize); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; + SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + + size_t encoded = encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } } // Called in IO thread. -void SslConnector::Writer::write(sys::ssl::SslIO&) { +bool SslConnector::canEncode() +{ Mutex::ScopedLock l(lock); - assert(buffer); + //have at least one full frameset or a whole buffers worth of data + return lastEof || currentSize >= maxFrameSize; +} + +// Called in IO thread. +size_t SslConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); size_t bytesWritten(0); - for (size_t i = 0; i < lastEof; ++i) { - AMQFrame& frame = frames[i]; - uint32_t size = frame.encodedSize(); - if (size > encode.available()) writeOne(); - assert(size <= encode.available()); - frame.encode(encode); - ++framesEncoded; - bytesWritten += size; + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } - frames.erase(frames.begin(), frames.begin()+lastEof); - lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(); + return bytesWritten; } -void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) +{ + int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount); + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded < buff->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} +size_t SslConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { - //TODO: check the version is correct QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); + if(!(protocolInit==version)){ + throw Exception(QPID_MSG("Unsupported version: " << protocolInit + << " supported version " << version)); + } } initiated = true; } @@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); input->received(frame); } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } -} - -void SslConnector::writebuff(SslIO& aio_) { - writer.write(aio_); + return size - in.available(); } void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = new Buff(maxFrameSize); + SslIO::BufferBase* buff = aio->getQueuedBuffer(); + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); |
