summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/TCPConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/TCPConnector.cpp')
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp14
1 files changed, 11 insertions, 3 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index 00584d168e..94c4a4cae0 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -108,15 +108,23 @@ void TCPConnector::connected(const Socket&) {
0, // closed
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
+ start(aio);
+ initAmqp();
+ aio->start(poller);
+}
+
+void TCPConnector::start(sys::AsynchIO* aio_) {
+ aio = aio_;
for (int i = 0; i < 32; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+}
+
+void TCPConnector::initAmqp() {
ProtocolInitiation init(version);
writeDataBlock(init);
-
- aio->start(poller);
}
void TCPConnector::connectFailed(const std::string& msg) {
@@ -286,7 +294,7 @@ size_t TCPConnector::decode(const char* buffer, size_t size)
}
void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();