diff options
author | Alan Conway <aconway@apache.org> | 2008-12-17 18:05:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-12-17 18:05:30 +0000 |
commit | b2c9227189307ebef1f1d2d2576b0b418e1086f1 (patch) | |
tree | 32615c1179a9898bfe94df5efd8192f90ff49f62 | |
parent | ef23eec6dfcd906a235e3694b0430f465416727c (diff) | |
download | qpid-python-b2c9227189307ebef1f1d2d2576b0b418e1086f1.tar.gz |
src/qpid/amqp_0_10/Connection.cpp: allow encoding to be concurrent with adding new frames.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@727455 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.h | 1 |
2 files changed, 23 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 8770433e20..5b14d60ff5 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -69,7 +69,11 @@ bool Connection::isClosed() const { } size_t Connection::encode(const char* buffer, size_t size) { - Mutex::ScopedLock l(frameQueueLock); + { // Swap frameQueue data into workQueue to avoid holding lock while we encode. + Mutex::ScopedLock l(frameQueueLock); + assert(workQueue.empty()); + workQueue.swap(frameQueue); + } framing::Buffer out(const_cast<char*>(buffer), size); if (!isClient && !initialized) { framing::ProtocolInitiation pi(getVersion()); @@ -78,16 +82,24 @@ size_t Connection::encode(const char* buffer, size_t size) { QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")"); } size_t frameSize=0; - while (!frameQueue.empty() && ((frameSize=frameQueue.front().encodedSize()) <= out.available())) { - frameQueue.front().encode(out); - QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); - frameQueue.pop_front(); - buffered -= frameSize; - if (frameQueue.empty() && out.available() > 0) connection->doOutput(); + size_t encoded=0; + while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize()) <= out.available())) { + workQueue.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front()); + workQueue.pop_front(); + encoded += frameSize; + if (workQueue.empty() && out.available() > 0) connection->doOutput(); + } + assert(workQueue.empty() || workQueue.front().encodedSize() <= size); + if (!workQueue.empty() && workQueue.front().encodedSize() > size) + throw InternalErrorException(QPID_MSG("Frame too large for buffer.")); + { + Mutex::ScopedLock l(frameQueueLock); + buffered -= encoded; + // Put back any frames we did not encode. + frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end()); + workQueue.clear(); } - assert(frameQueue.empty() || frameQueue.front().encodedSize() <= size); - if (!frameQueue.empty() && frameQueue.front().encodedSize() > size) - throw InternalErrorException(QPID_MSG("Could not write frame, too large for buffer.")); return out.getPosition(); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h index 04bcf4a48b..743a7de3aa 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h @@ -45,6 +45,7 @@ class Connection : public sys::ConnectionCodec, typedef std::deque<framing::AMQFrame> FrameQueue; FrameQueue frameQueue; + FrameQueue workQueue; bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; |