summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-12-17 18:05:30 +0000
committerAlan Conway <aconway@apache.org>2008-12-17 18:05:30 +0000
commitb2c9227189307ebef1f1d2d2576b0b418e1086f1 (patch)
tree32615c1179a9898bfe94df5efd8192f90ff49f62
parentef23eec6dfcd906a235e3694b0430f465416727c (diff)
downloadqpid-python-b2c9227189307ebef1f1d2d2576b0b418e1086f1.tar.gz
src/qpid/amqp_0_10/Connection.cpp: allow encoding to be concurrent with adding new frames.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@727455 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp32
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.h1
2 files changed, 23 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index 8770433e20..5b14d60ff5 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -69,7 +69,11 @@ bool Connection::isClosed() const {
}
size_t Connection::encode(const char* buffer, size_t size) {
- Mutex::ScopedLock l(frameQueueLock);
+ { // Swap frameQueue data into workQueue to avoid holding lock while we encode.
+ Mutex::ScopedLock l(frameQueueLock);
+ assert(workQueue.empty());
+ workQueue.swap(frameQueue);
+ }
framing::Buffer out(const_cast<char*>(buffer), size);
if (!isClient && !initialized) {
framing::ProtocolInitiation pi(getVersion());
@@ -78,16 +82,24 @@ size_t Connection::encode(const char* buffer, size_t size) {
QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
size_t frameSize=0;
- while (!frameQueue.empty() && ((frameSize=frameQueue.front().encodedSize()) <= out.available())) {
- frameQueue.front().encode(out);
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop_front();
- buffered -= frameSize;
- if (frameQueue.empty() && out.available() > 0) connection->doOutput();
+ size_t encoded=0;
+ while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize()) <= out.available())) {
+ workQueue.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front());
+ workQueue.pop_front();
+ encoded += frameSize;
+ if (workQueue.empty() && out.available() > 0) connection->doOutput();
+ }
+ assert(workQueue.empty() || workQueue.front().encodedSize() <= size);
+ if (!workQueue.empty() && workQueue.front().encodedSize() > size)
+ throw InternalErrorException(QPID_MSG("Frame too large for buffer."));
+ {
+ Mutex::ScopedLock l(frameQueueLock);
+ buffered -= encoded;
+ // Put back any frames we did not encode.
+ frameQueue.insert(frameQueue.begin(), workQueue.begin(), workQueue.end());
+ workQueue.clear();
}
- assert(frameQueue.empty() || frameQueue.front().encodedSize() <= size);
- if (!frameQueue.empty() && frameQueue.front().encodedSize() > size)
- throw InternalErrorException(QPID_MSG("Could not write frame, too large for buffer."));
return out.getPosition();
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
index 04bcf4a48b..743a7de3aa 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.h
@@ -45,6 +45,7 @@ class Connection : public sys::ConnectionCodec,
typedef std::deque<framing::AMQFrame> FrameQueue;
FrameQueue frameQueue;
+ FrameQueue workQueue;
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;