summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/amqp_0_10/Connection.cpp')
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Connection.cpp18
1 files changed, 13 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
index a3692911b2..0b996dedd2 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -30,7 +30,7 @@ using sys::Mutex;
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
: frameQueueClosed(false), output(o),
connection(new broker::Connection(this, broker, id, _isClient)),
- identifier(id), initialized(false), isClient(_isClient) {}
+ identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
@@ -53,7 +53,7 @@ size_t Connection::decode(const char* buffer, size_t size) {
bool Connection::canEncode() {
if (!frameQueueClosed) connection->doOutput();
- Mutex::ScopedLock l(frameQueueLock);
+ Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -71,10 +71,12 @@ size_t Connection::encode(const char* buffer, size_t size) {
initialized = true;
QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
}
- while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+ size_t frameSize=0;
+ while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) {
frameQueue.front().encode(out);
QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop();
+ frameQueue.pop_front();
+ buffered -= frameSize;
}
assert(frameQueue.empty() || frameQueue.front().size() <= size);
if (!frameQueue.empty() && frameQueue.front().size() > size)
@@ -98,7 +100,8 @@ void Connection::send(framing::AMQFrame& f) {
{
Mutex::ScopedLock l(frameQueueLock);
if (!frameQueueClosed)
- frameQueue.push(f);
+ frameQueue.push_back(f);
+ buffered += f.size();
}
activateOutput();
}
@@ -107,4 +110,9 @@ framing::ProtocolVersion Connection::getVersion() const {
return framing::ProtocolVersion(0,10);
}
+size_t Connection::getBuffered() const {
+ Mutex::ScopedLock l(frameQueueLock);
+ return buffered;
+}
+
}} // namespace qpid::amqp_0_10