summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp175
1 files changed, 95 insertions, 80 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 23d2c3ff8d..0e3afdd3f0 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -48,8 +48,7 @@ Connector::Connector(
timeoutHandler(0),
shutdownHandler(0),
aio(0)
-{
-}
+{}
Connector::~Connector() {
close();
@@ -62,12 +61,13 @@ void Connector::connect(const std::string& host, int port){
closed = false;
poller = Poller::shared_ptr(new Poller);
aio = new AsynchIO(socket,
- boost::bind(&Connector::readbuff, this, _1, _2),
- boost::bind(&Connector::eof, this, _1),
- boost::bind(&Connector::eof, this, _1),
- 0, // closed
- 0, // nobuffs
- boost::bind(&Connector::writebuff, this, _1));
+ boost::bind(&Connector::readbuff, this, _1, _2),
+ boost::bind(&Connector::eof, this, _1),
+ boost::bind(&Connector::eof, this, _1),
+ 0, // closed
+ 0, // nobuffs
+ boost::bind(&Connector::writebuff, this, _1));
+ writer.setAio(aio);
}
void Connector::init(){
@@ -103,12 +103,8 @@ OutputHandler* Connector::getOutputHandler(){
return this;
}
-void Connector::send(AMQFrame& frame){
- Mutex::ScopedLock l(writeLock);
- writeFrameQueue.push(frame);
- aio->notifyPendingWrite();
-
- QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+void Connector::send(AMQFrame& frame) {
+ writer.handle(frame);
}
void Connector::handleClosed() {
@@ -165,70 +161,89 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
- Buff() :
- AsynchIO::BufferBase(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
+struct Connector::Buff : public AsynchIO::BufferBase {
+ Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}
+ ~Buff() { delete [] bytes;}
};
-void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+Connector::Writer::Writer() : aio(0), buffer(0), lastEof(frames.begin()) {}
+
+Connector::Writer::~Writer() { delete buffer; }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << this << "]: " << frame);
- input->received(frame);
- }
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
+void Connector::Writer::setAio(sys::AsynchIO* a) {
+ Mutex::ScopedLock l(lock);
+ aio = a;
+ newBuffer(l);
}
-void Connector::writebuff(AsynchIO& aio) {
- Mutex::ScopedLock l(writeLock);
-
- if (writeFrameQueue.empty()) {
- return;
+void Connector::Writer::handle(framing::AMQFrame& frame) {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ if (frame.getEof()) {
+ lastEof = frames.end();
+ aio->notifyPendingWrite();
}
+ QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+}
- do {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio.getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
-
- framing::AMQFrame frame = writeFrameQueue.front();
- int frameSize = frame.size();
- while (frameSize <= int(out.available())) {
-
- // Encode output frame
- frame.encode(out);
- buffUsed += frameSize;
-
- writeFrameQueue.pop();
- if (writeFrameQueue.empty())
- break;
- frame = writeFrameQueue.front();
- frameSize = frame.size();
- }
-
- buff->dataCount = buffUsed;
- aio.queueWrite(buff);
- } while (!writeFrameQueue.empty());
+void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
+ assert(buffer);
+ QPID_LOG(trace, "Write buffer " << encode.getPosition()
+ << " bytes " << framesEncoded << " frames ");
+ framesEncoded = 0;
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encode.getPosition();
+ aio->queueWrite(buffer);
+ newBuffer(l);
+}
+
+void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
+ buffer = aio->getQueuedBuffer();
+ if (!buffer) buffer = new Buff();
+ encode = framing::Buffer(buffer->bytes, buffer->byteCount);
+ framesEncoded = 0;
+}
+
+// Called in IO thread.
+void Connector::Writer::write(sys::AsynchIO& aio_) {
+ Mutex::ScopedLock l(lock);
+ assert(&aio_ == aio);
+ assert(buffer);
+ for (Frames::iterator i = frames.begin(); i != lastEof; ++i) {
+ if (i->size() > encode.available()) writeOne(l);
+ assert(i->size() <= encode.available());
+ i->encode(encode);
+ ++framesEncoded;
+ }
+ frames.erase(frames.begin(), lastEof);
+ lastEof = frames.begin();
+ if (encode.getPosition() > 0) writeOne(l);
+}
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+ input->received(frame);
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+
+void Connector::writebuff(AsynchIO& aio_) {
+ writer.write(aio_);
}
void Connector::writeDataBlock(const AMQDataBlock& data) {
@@ -240,24 +255,24 @@ void Connector::writeDataBlock(const AMQDataBlock& data) {
}
void Connector::eof(AsynchIO&) {
- handleClosed();
+ handleClosed();
}
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
void Connector::run(){
- try {
- Dispatcher d(poller);
+ try {
+ Dispatcher d(poller);
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
- }
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
- aio->start(poller);
- d.run();
+ aio->start(poller);
+ d.run();
aio->queueForDeletion();
socket.close();
- } catch (const std::exception& e) {
+ } catch (const std::exception& e) {
QPID_LOG(error, e.what());
handleClosed();
}