diff options
Diffstat (limited to 'qpid/cpp/src/qpid/amqp_0_10/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 96d5146c30..bf2e7d5713 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -31,7 +31,7 @@ namespace amqp_0_10 { using sys::Mutex; Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient) - : frameQueueClosed(false), output(o), identifier(id), initialized(false), + : pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0), version(0,10) {} @@ -61,19 +61,23 @@ size_t Connection::decode(const char* buffer, size_t size) { } bool Connection::canEncode() { - if (!frameQueueClosed) connection->doOutput(); Mutex::ScopedLock l(frameQueueLock); - return (!isClient && !initialized) || !frameQueue.empty(); + if (!popClosed) { + Mutex::ScopedUnlock u(frameQueueLock); + connection->doOutput(); + } + return !popClosed && ((!isClient && !initialized) || !frameQueue.empty()); } bool Connection::isClosed() const { Mutex::ScopedLock l(frameQueueLock); - return frameQueueClosed; + return pushClosed && popClosed; } size_t Connection::encode(const char* buffer, size_t size) { { // Swap frameQueue data into workQueue to avoid holding lock while we encode. Mutex::ScopedLock l(frameQueueLock); + if (popClosed) return 0; // Can't pop any more frames. assert(workQueue.empty()); workQueue.swap(frameQueue); } @@ -102,6 +106,8 @@ size_t Connection::encode(const char* buffer, size_t size) { // Put back any frames we did not encode. frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end()); workQueue.clear(); + if (frameQueue.empty() && pushClosed) + popClosed = true; } return out.getPosition(); } @@ -111,9 +117,10 @@ void Connection::activateOutput() { output.activateOutput(); } void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); } void Connection::close() { - // Close the output queue. + // No more frames can be pushed onto the queue. + // Frames aleady on the queue can be popped. Mutex::ScopedLock l(frameQueueLock); - frameQueueClosed = true; + pushClosed = true; } void Connection::closed() { @@ -123,7 +130,7 @@ void Connection::closed() { void Connection::send(framing::AMQFrame& f) { { Mutex::ScopedLock l(frameQueueLock); - if (!frameQueueClosed) + if (!pushClosed) frameQueue.push_back(f); buffered += f.encodedSize(); } |