diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-29 20:15:18 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-29 20:15:18 +0000 |
commit | acc0dee435e1fa22e3b1e7cdfecf6913bf88988e (patch) | |
tree | 729f7a03543acf23380e68897f8788a3e6b45e2e /cpp/src/qpid/client/Connector.cpp | |
parent | a19ce3b1863f80c1232ec2690cd920325a39d71a (diff) | |
download | qpid-python-acc0dee435e1fa22e3b1e7cdfecf6913bf88988e.tar.gz |
QPID-974: allow the size of the queue of outgoing frames to be restricted
QPID-544: tidy up configuration (ensuring desired settings are used correctly,
allowing tcp socket options to be set etc)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652083 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 56 |
1 files changed, 32 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 7fb4997f5a..c9c55c50e8 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -20,6 +20,8 @@ */ #include "Connector.h" +#include "Bounds.h" +#include "ConnectionSettings.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" @@ -40,21 +42,22 @@ using namespace qpid::framing; using boost::format; using boost::str; -Connector::Connector( - ProtocolVersion ver, bool _debug, uint32_t buffer_size -) : debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - version(ver), - initiated(false), - closed(true), - joined(true), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - aio(0) -{} +Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds) + : maxFrameSize(settings.maxFrameSize), + version(ver), + initiated(false), + closed(true), + joined(true), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + writer(maxFrameSize, bounds), + aio(0) +{ + QPID_LOG(debug, "Connector created for " << version); + socket.configure(settings); +} Connector::~Connector() { close(); @@ -176,11 +179,11 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ } struct Connector::Buff : public AsynchIO::BufferBase { - Buff() : AsynchIO::BufferBase(new char[65536], 65536) {} + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} ~Buff() { delete [] bytes;} }; -Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) +Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -192,12 +195,12 @@ void Connector::Writer::init(std::string id, sys::AsynchIO* a) { aio = a; newBuffer(l); } - void Connector::Writer::handle(framing::AMQFrame& frame) { Mutex::ScopedLock l(lock); frames.push_back(frame); - if (frame.getEof()) { + if (frame.getEof()) {//or if we already have a buffers worth lastEof = frames.size(); + QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); aio->notifyPendingWrite(); } QPID_LOG(trace, "SENT " << identifier << ": " << frame); @@ -217,7 +220,7 @@ void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(); + if (!buffer) buffer = new Buff(maxFrameSize); encode = framing::Buffer(buffer->bytes, buffer->byteCount); framesEncoded = 0; } @@ -226,15 +229,20 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { void Connector::Writer::write(sys::AsynchIO&) { Mutex::ScopedLock l(lock); assert(buffer); + size_t bytesWritten(0); for (size_t i = 0; i < lastEof; ++i) { AMQFrame& frame = frames[i]; - if (frame.size() > encode.available()) writeOne(l); - assert(frame.size() <= encode.available()); + uint32_t size = frame.size(); + if (size > encode.available()) writeOne(l); + assert(size <= encode.available()); frame.encode(encode); ++framesEncoded; + bytesWritten += size; + QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i); } frames.erase(frames.begin(), frames.begin()+lastEof); lastEof = 0; + if (bounds) bounds->reduce(bytesWritten); if (encode.getPosition() > 0) writeOne(l); } @@ -272,7 +280,7 @@ void Connector::writebuff(AsynchIO& aio_) { } void Connector::writeDataBlock(const AMQDataBlock& data) { - AsynchIO::BufferBase* buff = new Buff; + AsynchIO::BufferBase* buff = new Buff(maxFrameSize); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.size(); @@ -290,7 +298,7 @@ void Connector::run(){ Dispatcher d(poller); for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff); + aio->queueReadBuffer(new Buff(maxFrameSize)); } aio->start(poller); |