summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp175
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h30
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.cpp4
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.h11
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp246
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp59
-rw-r--r--qpid/cpp/src/tests/perftest.cpp2
7 files changed, 287 insertions, 240 deletions
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 23d2c3ff8d..0e3afdd3f0 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -48,8 +48,7 @@ Connector::Connector(
timeoutHandler(0),
shutdownHandler(0),
aio(0)
-{
-}
+{}
Connector::~Connector() {
close();
@@ -62,12 +61,13 @@ void Connector::connect(const std::string& host, int port){
closed = false;
poller = Poller::shared_ptr(new Poller);
aio = new AsynchIO(socket,
- boost::bind(&Connector::readbuff, this, _1, _2),
- boost::bind(&Connector::eof, this, _1),
- boost::bind(&Connector::eof, this, _1),
- 0, // closed
- 0, // nobuffs
- boost::bind(&Connector::writebuff, this, _1));
+ boost::bind(&Connector::readbuff, this, _1, _2),
+ boost::bind(&Connector::eof, this, _1),
+ boost::bind(&Connector::eof, this, _1),
+ 0, // closed
+ 0, // nobuffs
+ boost::bind(&Connector::writebuff, this, _1));
+ writer.setAio(aio);
}
void Connector::init(){
@@ -103,12 +103,8 @@ OutputHandler* Connector::getOutputHandler(){
return this;
}
-void Connector::send(AMQFrame& frame){
- Mutex::ScopedLock l(writeLock);
- writeFrameQueue.push(frame);
- aio->notifyPendingWrite();
-
- QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+void Connector::send(AMQFrame& frame) {
+ writer.handle(frame);
}
void Connector::handleClosed() {
@@ -165,70 +161,89 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
- Buff() :
- AsynchIO::BufferBase(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
+struct Connector::Buff : public AsynchIO::BufferBase {
+ Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}
+ ~Buff() { delete [] bytes;}
};
-void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+Connector::Writer::Writer() : aio(0), buffer(0), lastEof(frames.begin()) {}
+
+Connector::Writer::~Writer() { delete buffer; }
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV [" << this << "]: " << frame);
- input->received(frame);
- }
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
+void Connector::Writer::setAio(sys::AsynchIO* a) {
+ Mutex::ScopedLock l(lock);
+ aio = a;
+ newBuffer(l);
}
-void Connector::writebuff(AsynchIO& aio) {
- Mutex::ScopedLock l(writeLock);
-
- if (writeFrameQueue.empty()) {
- return;
+void Connector::Writer::handle(framing::AMQFrame& frame) {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ if (frame.getEof()) {
+ lastEof = frames.end();
+ aio->notifyPendingWrite();
}
+ QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+}
- do {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio.getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
-
- framing::AMQFrame frame = writeFrameQueue.front();
- int frameSize = frame.size();
- while (frameSize <= int(out.available())) {
-
- // Encode output frame
- frame.encode(out);
- buffUsed += frameSize;
-
- writeFrameQueue.pop();
- if (writeFrameQueue.empty())
- break;
- frame = writeFrameQueue.front();
- frameSize = frame.size();
- }
-
- buff->dataCount = buffUsed;
- aio.queueWrite(buff);
- } while (!writeFrameQueue.empty());
+void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
+ assert(buffer);
+ QPID_LOG(trace, "Write buffer " << encode.getPosition()
+ << " bytes " << framesEncoded << " frames ");
+ framesEncoded = 0;
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encode.getPosition();
+ aio->queueWrite(buffer);
+ newBuffer(l);
+}
+
+void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
+ buffer = aio->getQueuedBuffer();
+ if (!buffer) buffer = new Buff();
+ encode = framing::Buffer(buffer->bytes, buffer->byteCount);
+ framesEncoded = 0;
+}
+
+// Called in IO thread.
+void Connector::Writer::write(sys::AsynchIO& aio_) {
+ Mutex::ScopedLock l(lock);
+ assert(&aio_ == aio);
+ assert(buffer);
+ for (Frames::iterator i = frames.begin(); i != lastEof; ++i) {
+ if (i->size() > encode.available()) writeOne(l);
+ assert(i->size() <= encode.available());
+ i->encode(encode);
+ ++framesEncoded;
+ }
+ frames.erase(frames.begin(), lastEof);
+ lastEof = frames.begin();
+ if (encode.getPosition() > 0) writeOne(l);
+}
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+ input->received(frame);
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+
+void Connector::writebuff(AsynchIO& aio_) {
+ writer.write(aio_);
}
void Connector::writeDataBlock(const AMQDataBlock& data) {
@@ -240,24 +255,24 @@ void Connector::writeDataBlock(const AMQDataBlock& data) {
}
void Connector::eof(AsynchIO&) {
- handleClosed();
+ handleClosed();
}
// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
// will never be called
void Connector::run(){
- try {
- Dispatcher d(poller);
+ try {
+ Dispatcher d(poller);
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
- }
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
- aio->start(poller);
- d.run();
+ aio->start(poller);
+ d.run();
aio->queueForDeletion();
socket.close();
- } catch (const std::exception& e) {
+ } catch (const std::exception& e) {
QPID_LOG(error, e.what());
handleClosed();
}
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index b1d759569c..9897789901 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -45,6 +45,33 @@ namespace client {
class Connector : public framing::OutputHandler,
private sys::Runnable
{
+ struct Buff;
+
+ /** Batch up frames for writing to aio. */
+ class Writer : public framing::FrameHandler {
+ typedef sys::AsynchIO::BufferBase BufferBase;
+ typedef std::vector<framing::AMQFrame> Frames;
+
+ sys::Mutex lock;
+ sys::AsynchIO* aio;
+ BufferBase* buffer;
+ Frames frames;
+ Frames::iterator lastEof; // Points after last EOF in frames
+ framing::Buffer encode;
+ size_t framesEncoded;
+
+ void writeOne(const sys::Mutex::ScopedLock&);
+ void newBuffer(const sys::Mutex::ScopedLock&);
+
+ public:
+
+ Writer();
+ ~Writer();
+ void setAio(sys::AsynchIO*);
+ void handle(framing::AMQFrame&);
+ void write(sys::AsynchIO&);
+ };
+
const bool debug;
const int receive_buffer_size;
const int send_buffer_size;
@@ -65,8 +92,7 @@ class Connector : public framing::OutputHandler,
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- sys::Mutex writeLock;
- std::queue<framing::AMQFrame> writeFrameQueue;
+ Writer writer;
sys::Thread receiver;
diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp
index b42797414f..7eadf377b9 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.cpp
+++ b/qpid/cpp/src/qpid/framing/Buffer.cpp
@@ -47,10 +47,6 @@ void Buffer::reset(){
position = 0;
}
-uint32_t Buffer::available(){
- return size - position;
-}
-
///////////////////////////////////////////////////
void Buffer::putOctet(uint8_t i){
diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h
index fe33dbd366..5ab897d351 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.h
+++ b/qpid/cpp/src/qpid/framing/Buffer.h
@@ -34,7 +34,7 @@ class FieldTable;
class Buffer
{
- const uint32_t size;
+ uint32_t size;
char* data;
uint32_t position;
uint32_t r_position;
@@ -43,13 +43,16 @@ class Buffer
public:
- Buffer(char* data, uint32_t size);
+ Buffer(char* data=0, uint32_t size=0);
void record();
void restore(bool reRecord = false);
void reset();
- uint32_t available();
-
+
+ uint32_t available() { return size - position; }
+ uint32_t getSize() { return size; }
+ uint32_t getPosition() { return position; }
+
void putOctet(uint8_t i);
void putShort(uint16_t i);
void putLong(uint32_t i);
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 9ca083354b..c2c4b545f9 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -44,12 +44,12 @@ namespace qpid {
namespace sys {
class AsynchIOAcceptor : public Acceptor {
- Poller::shared_ptr poller;
- Socket listener;
- int numIOThreads;
- const uint16_t listeningPort;
+ Poller::shared_ptr poller;
+ Socket listener;
+ int numIOThreads;
+ const uint16_t listeningPort;
-public:
+ public:
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
@@ -58,7 +58,7 @@ public:
uint16_t getPort() const;
std::string getHost() const;
-private:
+ private:
void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
};
@@ -69,9 +69,9 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
}
AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
- poller(new Poller),
- numIOThreads(threads),
- listeningPort(listener.listen(port, backlog))
+ poller(new Poller),
+ numIOThreads(threads),
+ listeningPort(listener.listen(port, backlog))
{}
// Buffer definition
@@ -93,53 +93,53 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
bool readError;
std::string identifier;
-public:
- AsynchIOHandler() :
- inputHandler(0),
- frameQueueClosed(false),
- initiated(false),
- readError(false)
- {}
+ public:
+ AsynchIOHandler() :
+ inputHandler(0),
+ frameQueueClosed(false),
+ initiated(false),
+ readError(false)
+ {}
- ~AsynchIOHandler() {
- if (inputHandler)
- inputHandler->closed();
- delete inputHandler;
- }
-
- void init(AsynchIO* a, ConnectionInputHandler* h) {
- aio = a;
- inputHandler = h;
- }
-
- // Output side
- void send(framing::AMQFrame&);
- void close();
- void activateOutput();
-
- // Input side
- void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
- void eof(AsynchIO& aio);
- void disconnect(AsynchIO& aio);
+ ~AsynchIOHandler() {
+ if (inputHandler)
+ inputHandler->closed();
+ delete inputHandler;
+ }
+
+ void init(AsynchIO* a, ConnectionInputHandler* h) {
+ aio = a;
+ inputHandler = h;
+ }
+
+ // Output side
+ void send(framing::AMQFrame&);
+ void close();
+ void activateOutput();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
- // Notifications
- void nobuffs(AsynchIO& aio);
- void idle(AsynchIO& aio);
- void closedSocket(AsynchIO& aio, const Socket& s);
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
};
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
- AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async, s);
+ AsynchIOHandler* async = new AsynchIOHandler;
+ ConnectionInputHandler* handler = f->create(async, s);
AsynchIO* aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
@@ -158,50 +158,50 @@ std::string AsynchIOAcceptor::getHost() const {
}
void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
- Dispatcher d(poller);
- AsynchAcceptor
- acceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
- acceptor.start(poller);
+ Dispatcher d(poller);
+ AsynchAcceptor
+ acceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
+ acceptor.start(poller);
- std::vector<Thread> t(numIOThreads-1);
+ std::vector<Thread> t(numIOThreads-1);
- // Run n-1 io threads
- for (int i=0; i<numIOThreads-1; ++i)
- t[i] = Thread(d);
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = Thread(d);
- // Run final thread
- d.run();
+ // Run final thread
+ d.run();
- // Now wait for n-1 io threads to exit
- for (int i=0; i<numIOThreads-1; ++i) {
- t[i].join();
- }
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i<numIOThreads-1; ++i) {
+ t[i].join();
+ }
}
void AsynchIOAcceptor::shutdown() {
- poller->shutdown();
+ poller->shutdown();
}
// Output side
void AsynchIOHandler::send(framing::AMQFrame& frame) {
- // TODO: Need to find out if we are in the callback context,
- // in the callback thread if so we can go further than just queuing the frame
- // to be handled later
- {
+ // TODO: Need to find out if we are in the callback context,
+ // in the callback thread if so we can go further than just queuing the frame
+ // to be handled later
+ {
ScopedLock<Mutex> l(frameQueueLock);
// Ignore anything seen after closing
if (!frameQueueClosed)
- frameQueue.push(frame);
- }
+ frameQueue.push(frame);
+ }
- // Activate aio for writing here
- aio->notifyPendingWrite();
+ // Activate aio for writing here
+ aio->notifyPendingWrite();
}
void AsynchIOHandler::close() {
- ScopedLock<Mutex> l(frameQueueLock);
- frameQueueClosed = true;
+ ScopedLock<Mutex> l(frameQueueLock);
+ frameQueueClosed = true;
}
void AsynchIOHandler::activateOutput() {
@@ -218,7 +218,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::AMQFrame frame;
try{
while(frame.decode(in)) {
- QPID_LOG(debug, "RECV [" << identifier << "]: " << frame);
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
inputHandler->received(frame);
}
}catch(const std::exception& e){
@@ -249,9 +249,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
}
void AsynchIOHandler::eof(AsynchIO&) {
- QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- inputHandler->closed();
- aio->queueWriteClose();
+ QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
+ inputHandler->closed();
+ aio->queueWriteClose();
}
void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
@@ -259,14 +259,14 @@ void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
if (!aio->writeQueueEmpty()) {
QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)");
}
- delete &s;
- aio->queueForDeletion();
- delete this;
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
}
void AsynchIOHandler::disconnect(AsynchIO& a) {
- // treat the same as eof
- eof(a);
+ // treat the same as eof
+ eof(a);
}
// Notifications
@@ -274,50 +274,54 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
- ScopedLock<Mutex> l(frameQueueLock);
+ ScopedLock<Mutex> l(frameQueueLock);
- if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so tell the input handler to queue any available output:
- inputHandler->doOutput();
- //if still no frames, theres nothing to do:
- if (frameQueue.empty()) return;
- }
+ if (frameQueue.empty()) {
+ // At this point we know that we're write idling the connection
+ // so tell the input handler to queue any available output:
+ inputHandler->doOutput();
+ //if still no frames, theres nothing to do:
+ if (frameQueue.empty()) return;
+ }
- do {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
+ do {
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ int buffUsed = 0;
- framing::AMQFrame frame = frameQueue.front();
- int frameSize = frame.size();
- while (frameSize <= int(out.available())) {
- frameQueue.pop();
+ framing::AMQFrame frame = frameQueue.front();
+ int frameSize = frame.size();
+ int framesEncoded=0;
+ while (frameSize <= int(out.available())) {
+ frameQueue.pop();
- // Encode output frame
- frame.encode(out);
- buffUsed += frameSize;
- QPID_LOG(debug, "SENT [" << identifier << "]: " << frame);
+ // Encode output frame
+ frame.encode(out);
+ ++framesEncoded;
+ buffUsed += frameSize;
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
- if (frameQueue.empty())
- break;
- frame = frameQueue.front();
- frameSize = frame.size();
- }
- // If frame was egregiously large complain
- if (frameSize > buff->byteCount)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
+ if (frameQueue.empty())
+ break;
+ frame = frameQueue.front();
+ frameSize = frame.size();
+ }
+ QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames ");
+
+ // If frame was egregiously large complain
+ if (frameSize > buff->byteCount)
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
- buff->dataCount = buffUsed;
- aio->queueWrite(buff);
- } while (!frameQueue.empty());
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+ } while (!frameQueue.empty());
- if (frameQueueClosed) {
- aio->queueWriteClose();
- }
+ if (frameQueueClosed) {
+ aio->queueWriteClose();
+ }
}
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index f8aaa38cf5..94c68bd5d0 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -42,7 +42,7 @@ namespace {
* pipe/socket (necessary as default action is to terminate process)
*/
void ignoreSigpipe() {
- ::signal(SIGPIPE, SIG_IGN);
+ ::signal(SIGPIPE, SIG_IGN);
}
/*
@@ -88,7 +88,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
if (s) {
acceptedCallback(*s);
} else {
- break;
+ break;
}
} while (true);
@@ -99,13 +99,13 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
* Asynch reader/writer
*/
AsynchIO::AsynchIO(const Socket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
DispatchHandle(s,
- boost::bind(&AsynchIO::readable, this, _1),
- boost::bind(&AsynchIO::writeable, this, _1),
- boost::bind(&AsynchIO::disconnected, this, _1)),
+ boost::bind(&AsynchIO::readable, this, _1),
+ boost::bind(&AsynchIO::writeable, this, _1),
+ boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
disCallback(disCb),
@@ -120,8 +120,8 @@ AsynchIO::AsynchIO(const Socket& s,
struct deleter
{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
};
AsynchIO::~AsynchIO() {
@@ -138,7 +138,7 @@ void AsynchIO::start(Poller::shared_ptr poller) {
}
void AsynchIO::queueReadBuffer(BufferBase* buff) {
- assert(buff);
+ assert(buff);
buff->dataStart = 0;
buff->dataCount = 0;
bufferQueue.push_back(buff);
@@ -146,11 +146,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) {
}
void AsynchIO::unread(BufferBase* buff) {
- assert(buff);
- if (buff->dataStart != 0) {
- memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
- buff->dataStart = 0;
- }
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
bufferQueue.push_front(buff);
DispatchHandle::rewatchRead();
}
@@ -182,14 +182,15 @@ void AsynchIO::queueWriteClose() {
* to spare
*/
AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
- // Always keep at least one buffer (it might have data that was "unread" in it)
- if (bufferQueue.size()<=1)
- return 0;
- BufferBase* buff = bufferQueue.back();
- buff->dataStart = 0;
- buff->dataCount = 0;
- bufferQueue.pop_back();
- return buff;
+ // Always keep at least one buffer (it might have data that was "unread" in it)
+ if (bufferQueue.size()<=1)
+ return 0;
+ BufferBase* buff = bufferQueue.back();
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.pop_back();
+ return buff;
}
/*
@@ -204,6 +205,7 @@ void AsynchIO::readable(DispatchHandle& h) {
if (!bufferQueue.empty()) {
// Read into buffer
BufferBase* buff = bufferQueue.front();
+ assert(buff);
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
@@ -227,6 +229,7 @@ void AsynchIO::readable(DispatchHandle& h) {
} else {
// Put buffer back (at front so it doesn't interfere with unread buffers)
bufferQueue.push_front(buff);
+ assert(buff);
// Eof or other side has gone away
if (rc == 0 || errno == ECONNRESET) {
@@ -352,10 +355,10 @@ void AsynchIO::disconnected(DispatchHandle& h) {
* Close the socket and callback to say we've done it
*/
void AsynchIO::close(DispatchHandle& h) {
- h.stopWatch();
- h.getSocket().close();
- if (closedCallback) {
- closedCallback(*this, getSocket());
- }
+ h.stopWatch();
+ h.getSocket().close();
+ if (closedCallback) {
+ closedCallback(*this, getSocket());
+ }
}
diff --git a/qpid/cpp/src/tests/perftest.cpp b/qpid/cpp/src/tests/perftest.cpp
index ee210891fe..1bd5a963de 100644
--- a/qpid/cpp/src/tests/perftest.cpp
+++ b/qpid/cpp/src/tests/perftest.cpp
@@ -476,7 +476,7 @@ struct SubscribeThread : public Client {
session.close();
}
catch (const std::exception& e) {
- cout << "Publisher exception: " << e.what() << endl;
+ cout << "SubscribeThread exception: " << e.what() << endl;
exit(1);
}
}