summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp56
1 files changed, 32 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 7fb4997f5a..c9c55c50e8 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -20,6 +20,8 @@
*/
#include "Connector.h"
+#include "Bounds.h"
+#include "ConnectionSettings.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
@@ -40,21 +42,22 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-Connector::Connector(
- ProtocolVersion ver, bool _debug, uint32_t buffer_size
-) : debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- version(ver),
- initiated(false),
- closed(true),
- joined(true),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- aio(0)
-{}
+Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds)
+ : maxFrameSize(settings.maxFrameSize),
+ version(ver),
+ initiated(false),
+ closed(true),
+ joined(true),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ writer(maxFrameSize, bounds),
+ aio(0)
+{
+ QPID_LOG(debug, "Connector created for " << version);
+ socket.configure(settings);
+}
Connector::~Connector() {
close();
@@ -176,11 +179,11 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
}
struct Connector::Buff : public AsynchIO::BufferBase {
- Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
~Buff() { delete [] bytes;}
};
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0)
+Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
@@ -192,12 +195,12 @@ void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
aio = a;
newBuffer(l);
}
-
void Connector::Writer::handle(framing::AMQFrame& frame) {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
- if (frame.getEof()) {
+ if (frame.getEof()) {//or if we already have a buffers worth
lastEof = frames.size();
+ QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
aio->notifyPendingWrite();
}
QPID_LOG(trace, "SENT " << identifier << ": " << frame);
@@ -217,7 +220,7 @@ void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff();
+ if (!buffer) buffer = new Buff(maxFrameSize);
encode = framing::Buffer(buffer->bytes, buffer->byteCount);
framesEncoded = 0;
}
@@ -226,15 +229,20 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
void Connector::Writer::write(sys::AsynchIO&) {
Mutex::ScopedLock l(lock);
assert(buffer);
+ size_t bytesWritten(0);
for (size_t i = 0; i < lastEof; ++i) {
AMQFrame& frame = frames[i];
- if (frame.size() > encode.available()) writeOne(l);
- assert(frame.size() <= encode.available());
+ uint32_t size = frame.size();
+ if (size > encode.available()) writeOne(l);
+ assert(size <= encode.available());
frame.encode(encode);
++framesEncoded;
+ bytesWritten += size;
+ QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i);
}
frames.erase(frames.begin(), frames.begin()+lastEof);
lastEof = 0;
+ if (bounds) bounds->reduce(bytesWritten);
if (encode.getPosition() > 0) writeOne(l);
}
@@ -272,7 +280,7 @@ void Connector::writebuff(AsynchIO& aio_) {
}
void Connector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff;
+ AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.size();
@@ -290,7 +298,7 @@ void Connector::run(){
Dispatcher d(poller);
for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
+ aio->queueReadBuffer(new Buff(maxFrameSize));
}
aio->start(poller);