diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-27 15:40:33 +0000 |
commit | 868ce7469262d6fd2fe3f2e7f04cfe7af654d59f (patch) | |
tree | 63e6b5e62554609beb21e8c8d0610569f36d2743 /cpp/src/qpid/client | |
parent | 2e5ff8f1b328831043e6d7e323249d62187234c6 (diff) | |
download | qpid-python-868ce7469262d6fd2fe3f2e7f04cfe7af654d59f.tar.gz |
QPID-3858: Updated code to include recent refactoring by Gordon (gsim) - see QPID-4178.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1377715 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 224 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManagerImpl.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 1 |
4 files changed, 127 insertions, 131 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 4c6fadd28a..c2081a88f2 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -38,7 +38,6 @@ #include "qpid/Msg.h" #include <iostream> -#include <map> #include <boost/bind.hpp> #include <boost/format.hpp> @@ -54,53 +53,29 @@ using boost::str; class SslConnector : public Connector { - struct Buff; - - /** Batch up frames for writing to aio. */ - class Writer : public framing::FrameHandler { - typedef sys::ssl::SslIOBufferBase BufferBase; - typedef std::vector<framing::AMQFrame> Frames; - - const uint16_t maxFrameSize; - sys::Mutex lock; - sys::ssl::SslIO* aio; - BufferBase* buffer; - Frames frames; - size_t lastEof; // Position after last EOF in frames - framing::Buffer encode; - size_t framesEncoded; - std::string identifier; - Bounds* bounds; - - void writeOne(); - void newBuffer(); - - public: - - Writer(uint16_t maxFrameSize, Bounds*); - ~Writer(); - void init(std::string id, sys::ssl::SslIO*); - void handle(framing::AMQFrame&); - void write(sys::ssl::SslIO&); - }; + typedef std::deque<framing::AMQFrame> Frames; const uint16_t maxFrameSize; + + sys::Mutex lock; + Frames frames; + size_t lastEof; // Position after last EOF in frames + uint64_t currentSize; + Bounds* bounds; + framing::ProtocolVersion version; bool initiated; - SecuritySettings securitySettings; - - sys::Mutex closedLock; bool closed; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - Writer writer; - sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; + std::string identifier; Poller::shared_ptr poller; + SecuritySettings securitySettings; ~SslConnector(); @@ -110,10 +85,7 @@ class SslConnector : public Connector void eof(qpid::sys::ssl::SslIO&); void disconnected(qpid::sys::ssl::SslIO&); - std::string identifier; - void connect(const std::string& host, const std::string& port); - void init(); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: Need to fix for heartbeat timeouts to work @@ -126,17 +98,16 @@ class SslConnector : public Connector const SecuritySettings* getSecuritySettings(); void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool canEncode(); + public: SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { @@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::shared_ptr p, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), + lastEof(0), + currentSize(0), + bounds(cimpl), version(ver), initiated(false), closed(true), shutdownHandler(0), input(0), - writer(maxFrameSize, cimpl), aio(0), poller(p) { @@ -192,7 +165,7 @@ SslConnector::~SslConnector() { } void SslConnector::connect(const std::string& host, const std::string& port){ - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); assert(closed); try { socket.connect(host, port); @@ -201,7 +174,6 @@ void SslConnector::connect(const std::string& host, const std::string& port){ throw TransportFailure(e.what()); } - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), @@ -210,21 +182,16 @@ void SslConnector::connect(const std::string& host, const std::string& port){ boost::bind(&SslConnector::socketClosed, this, _1, _2), 0, // nobuffs boost::bind(&SslConnector::writebuff, this, _1)); - writer.init(identifier, aio); -} -void SslConnector::init(){ - Mutex::ScopedLock l(closedLock); + aio->createBuffers(maxFrameSize); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } aio->start(poller); } void SslConnector::close() { - Mutex::ScopedLock l(closedLock); + Mutex::ScopedLock l(lock); if (!closed) { closed = true; if (aio) @@ -260,76 +227,110 @@ const std::string& SslConnector::getIdentifier() const { } void SslConnector::send(AMQFrame& frame) { - writer.handle(frame); -} - -SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) -{ -} - -SslConnector::Writer::~Writer() { delete buffer; } - -void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) { - Mutex::ScopedLock l(lock); - identifier = id; - aio = a; - newBuffer(); -} -void SslConnector::Writer::handle(framing::AMQFrame& frame) { + bool notifyWrite = false; + { Mutex::ScopedLock l(lock); frames.push_back(frame); - if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) { + //only ask to write if this is the end of a frameset or if we + //already have a buffers worth of data + currentSize += frame.encodedSize(); + if (frame.getEof()) { lastEof = frames.size(); - aio->notifyPendingWrite(); + notifyWrite = true; + } else { + notifyWrite = (currentSize >= maxFrameSize); + } + /* + NOTE: Moving the following line into this mutex block + is a workaround for BZ 570168, in which the test + testConcurrentSenders causes a hang about 1.5% + of the time. ( To see the hang much more frequently + leave this line out of the mutex block, and put a + small usleep just before it.) + + TODO mgoulish - fix the underlying cause and then + move this call back outside the mutex. + */ + if (notifyWrite && !closed) aio->notifyPendingWrite(); } - QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); } -void SslConnector::Writer::writeOne() { - assert(buffer); - framesEncoded = 0; +void SslConnector::writebuff(SslIO& /*aio*/) +{ + // It's possible to be disconnected and be writable + if (closed) + return; - buffer->dataStart = 0; - buffer->dataCount = encode.getPosition(); - aio->queueWrite(buffer); - newBuffer(); -} + if (!canEncode()) { + return; + } -void SslConnector::Writer::newBuffer() { - buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(maxFrameSize); - encode = framing::Buffer(buffer->bytes, buffer->byteCount); - framesEncoded = 0; + SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { + + size_t encoded = encode(buffer->bytes, buffer->byteCount); + + buffer->dataStart = 0; + buffer->dataCount = encoded; + aio->queueWrite(buffer); + } } // Called in IO thread. -void SslConnector::Writer::write(sys::ssl::SslIO&) { +bool SslConnector::canEncode() +{ Mutex::ScopedLock l(lock); - assert(buffer); + //have at least one full frameset or a whole buffers worth of data + return lastEof || currentSize >= maxFrameSize; +} + +// Called in IO thread. +size_t SslConnector::encode(const char* buffer, size_t size) +{ + framing::Buffer out(const_cast<char*>(buffer), size); size_t bytesWritten(0); - for (size_t i = 0; i < lastEof; ++i) { - AMQFrame& frame = frames[i]; - uint32_t size = frame.encodedSize(); - if (size > encode.available()) writeOne(); - assert(size <= encode.available()); - frame.encode(encode); - ++framesEncoded; - bytesWritten += size; + { + Mutex::ScopedLock l(lock); + while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { + frames.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front()); + frames.pop_front(); + if (lastEof) --lastEof; + } + bytesWritten = size - out.available(); + currentSize -= bytesWritten; } - frames.erase(frames.begin(), frames.begin()+lastEof); - lastEof = 0; if (bounds) bounds->reduce(bytesWritten); - if (encode.getPosition() > 0) writeOne(); + return bytesWritten; } -void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); +void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) +{ + int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount); + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (decoded < buff->dataCount) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += decoded; + buff->dataCount -= decoded; + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} +size_t SslConnector::decode(const char* buffer, size_t size) +{ + framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { - //TODO: check the version is correct QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); + if(!(protocolInit==version)){ + throw Exception(QPID_MSG("Unsupported version: " << protocolInit + << " supported version " << version)); + } } initiated = true; } @@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); input->received(frame); } - // TODO: unreading needs to go away, and when we can cope - // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { - // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); - aio.unread(buff); - } else { - // Give whole buffer back to aio subsystem - aio.queueReadBuffer(buff); - } -} - -void SslConnector::writebuff(SslIO& aio_) { - writer.write(aio_); + return size - in.available(); } void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = new Buff(maxFrameSize); + SslIO::BufferBase* buff = aio->getQueuedBuffer(); + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp index 7dead112e5..44f81776c0 100644 --- a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp @@ -28,6 +28,7 @@ #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> #include <qpid/framing/Uuid.h> +#include <qpid/log/Statement.h> #include <set> #include <sstream> @@ -167,6 +168,15 @@ void SubscriptionManagerImpl::setFlowControl(const std::string& name, uint32_t m setFlowControl(name, FlowControl(messages, bytes, window)); } +AutoCancel::AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {} +AutoCancel::~AutoCancel() { + try { + sm.cancel(tag); + } catch (const qpid::Exception& e) { + QPID_LOG(info, "Exception in AutoCancel destructor: " << e.what()); + } +} + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 1dd951d339..a5c6465bad 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -46,11 +46,6 @@ using namespace qpid::framing; using boost::format; using boost::str; -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { @@ -118,9 +113,8 @@ void TCPConnector::connected(const Socket&) { void TCPConnector::start(sys::AsynchIO* aio_) { aio = aio_; - for (int i = 0; i < 4; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } + + aio->createBuffers(maxFrameSize); identifier = str(format("[%1%]") % socket.getFullAddress()); } @@ -226,15 +220,19 @@ void TCPConnector::writebuff(AsynchIO& /*aio*/) return; Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; - if (codec->canEncode()) { - std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); - if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); + + if (!codec->canEncode()) { + return; + } + + AsynchIO::BufferBase* buffer = aio->getQueuedBuffer(); + if (buffer) { size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; buffer->dataCount = encoded; - aio->queueWrite(buffer.release()); + aio->queueWrite(buffer); } } @@ -307,6 +305,7 @@ size_t TCPConnector::decode(const char* buffer, size_t size) void TCPConnector::writeDataBlock(const AMQDataBlock& data) { AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.encodedSize(); diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index c87d544816..c0bc26028d 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -50,7 +50,6 @@ namespace client { class TCPConnector : public Connector, public sys::Codec { typedef std::deque<framing::AMQFrame> Frames; - struct Buff; const uint16_t maxFrameSize; |