diff options
Diffstat (limited to 'qpid/cpp/src/qpid/amqp_0_10/Connection.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index a3692911b2..0b996dedd2 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -30,7 +30,7 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)), - identifier(id), initialized(false), isClient(_isClient) {} + identifier(id), initialized(false), isClient(_isClient), buffered(0) {} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); @@ -53,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) { bool Connection::canEncode() { if (!frameQueueClosed) connection->doOutput(); - Mutex::ScopedLock l(frameQueueLock); + Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -71,10 +71,12 @@ size_t Connection::encode(const char* buffer, size_t size) { initialized = true; QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")"); } - while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { + size_t frameSize=0; + while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) { frameQueue.front().encode(out); QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); - frameQueue.pop(); + frameQueue.pop_front(); + buffered -= frameSize; } assert(frameQueue.empty() || frameQueue.front().size() <= size); if (!frameQueue.empty() && frameQueue.front().size() > size) @@ -98,7 +100,8 @@ void Connection::send(framing::AMQFrame& f) { { Mutex::ScopedLock l(frameQueueLock); if (!frameQueueClosed) - frameQueue.push(f); + frameQueue.push_back(f); + buffered += f.size(); } activateOutput(); } @@ -107,4 +110,9 @@ framing::ProtocolVersion Connection::getVersion() const { return framing::ProtocolVersion(0,10); } +size_t Connection::getBuffered() const { + Mutex::ScopedLock l(frameQueueLock); + return buffered; +} + }} // namespace qpid::amqp_0_10 |