summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/amqp_0_10/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp21
1 files changed, 14 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index 96d5146c30..bf2e7d5713 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -31,7 +31,7 @@ namespace amqp_0_10 {
using sys::Mutex;
Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+ : pushClosed(false), popClosed(false), output(o), identifier(id), initialized(false),
isClient(_isClient), buffered(0), version(0,10)
{}
@@ -61,19 +61,23 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
- return (!isClient && !initialized) || !frameQueue.empty();
+ if (!popClosed) {
+ Mutex::ScopedUnlock u(frameQueueLock);
+ connection->doOutput();
+ }
+ return !popClosed && ((!isClient && !initialized) || !frameQueue.empty());
}
bool Connection::isClosed() const {
Mutex::ScopedLock l(frameQueueLock);
- return frameQueueClosed;
+ return pushClosed && popClosed;
}
size_t Connection::encode(const char* buffer, size_t size) {
{ // Swap frameQueue data into workQueue to avoid holding lock while we encode.
Mutex::ScopedLock l(frameQueueLock);
+ if (popClosed) return 0; // Can't pop any more frames.
assert(workQueue.empty());
workQueue.swap(frameQueue);
}
@@ -102,6 +106,8 @@ size_t Connection::encode(const char* buffer, size_t size) {
// Put back any frames we did not encode.
frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end());
workQueue.clear();
+ if (frameQueue.empty() && pushClosed)
+ popClosed = true;
}
return out.getPosition();
}
@@ -111,9 +117,10 @@ void Connection::activateOutput() { output.activateOutput(); }
void Connection::giveReadCredit(int32_t credit) { output.giveReadCredit(credit); }
void Connection::close() {
- // Close the output queue.
+ // No more frames can be pushed onto the queue.
+ // Frames aleady on the queue can be popped.
Mutex::ScopedLock l(frameQueueLock);
- frameQueueClosed = true;
+ pushClosed = true;
}
void Connection::closed() {
@@ -123,7 +130,7 @@ void Connection::closed() {
void Connection::send(framing::AMQFrame& f) {
{
Mutex::ScopedLock l(frameQueueLock);
- if (!frameQueueClosed)
+ if (!pushClosed)
frameQueue.push_back(f);
buffered += f.encodedSize();
}