diff options
author | Gordon Sim <gsim@apache.org> | 2010-05-27 18:09:13 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-05-27 18:09:13 +0000 |
commit | c95b2615abf0883f7d92aad73138a4dda14e1311 (patch) | |
tree | 7eb2195eab5c7ecafab17e553635a434b20dee64 /cpp/src | |
parent | 91491e533896be58438ba2dc0e199461b4320653 (diff) | |
download | qpid-python-c95b2615abf0883f7d92aad73138a4dda14e1311.tar.gz |
QPID-2631: For blocking Bounds::expand() calls, only increase the current count when there is space. In SessionImpl::send() expand bounds before queueing frame. Expand bounds for all frames sent (including connection frames and cluster specific frames).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@948936 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/Bounds.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 16 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 2 |
7 files changed, 37 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/Bounds.cpp b/cpp/src/qpid/client/Bounds.cpp index abb983a62e..cc2577d5fc 100644 --- a/cpp/src/qpid/client/Bounds.cpp +++ b/cpp/src/qpid/client/Bounds.cpp @@ -33,19 +33,19 @@ Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} bool Bounds::expand(size_t sizeRequired, bool block) { if (!max) return true; Waitable::ScopedLock l(lock); - current += sizeRequired; if (block) { Waitable::ScopedWait w(lock); - while (current > max) + while (current + sizeRequired > max) lock.wait(); } + current += sizeRequired; return current <= max; } void Bounds::reduce(size_t size) { if (!max || size == 0) return; Waitable::ScopedLock l(lock); - if (current == 0) return; + assert(current >= size); current -= std::min(size, current); if (current < max && lock.hasWaiters()) { lock.notifyAll(); diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 9d68448d9d..6aea4c4acf 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,6 +22,7 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/client/SaslFactory.h" +#include "qpid/client/Bounds.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ClientInvoker.h" @@ -70,8 +71,15 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode) } } -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), +ConnectionHandler::Adapter::Adapter(ConnectionHandler& h, Bounds& b) : handler(h), bounds(b) {} +void ConnectionHandler::Adapter::handle(framing::AMQFrame& f) +{ + bounds.expand(f.encodedSize(), false); + handler.out(f); +} + +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this, b), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v) { insist = true; diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 5f4b454f53..61709db174 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -47,6 +47,8 @@ struct SecuritySettings; namespace client { +class Bounds; + class ConnectionHandler : private StateManager, public ConnectionSettings, public ChainableFrameHandler, @@ -60,9 +62,10 @@ class ConnectionHandler : private StateManager, class Adapter : public framing::FrameHandler { ConnectionHandler& handler; + Bounds& bounds; public: - Adapter(ConnectionHandler& h) : handler(h) {} - void handle(framing::AMQFrame& f) { handler.out(f); } + Adapter(ConnectionHandler& h, Bounds& bounds); + void handle(framing::AMQFrame& f); }; Adapter outHandler; @@ -102,7 +105,7 @@ public: typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; typedef boost::function<const qpid::sys::SecuritySettings*()> GetSecuritySettings; - ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&); + ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&, Bounds&); void received(framing::AMQFrame& f) { incoming(f); } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d5fe7489d3..99f4411977 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -182,7 +182,7 @@ boost::shared_ptr<ConnectionImpl> ConnectionImpl::create(framing::ProtocolVersio ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), - handler(settings, v), + handler(settings, v, *this), version(v), nextChannel(1), shutdownComplete(false), diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index b7ff4307b6..b507625b11 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -510,8 +510,8 @@ void SessionImpl::proxyOut(AMQFrame& frame) // network thread void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock) { - channel.handle(frame); connection->expand(frame.encodedSize(), canBlock); + channel.handle(frame); } void SessionImpl::deliver(AMQFrame& frame) // network thread diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 1b740158a4..6499519187 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -73,11 +73,22 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler +{ + boost::shared_ptr<qpid::client::ConnectionImpl> connection; + ClusterConnectionProxy(client::Connection c) : - AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} + AMQP_AllProxy::ClusterConnection(*static_cast<framing::FrameHandler*>(this)), + connection(client::ConnectionAccess::getImpl(c)) {} ClusterConnectionProxy(client::AsyncSession s) : AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {} + + void handle(framing::AMQFrame& f) + { + assert(connection); + connection->expand(f.encodedSize(), false); + connection->handle(f); + } }; // Create a connection with special version that marks it as a catch-up connection. @@ -153,6 +164,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); + client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 8c18e578df..d5f2c457e5 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -124,6 +124,7 @@ class Sender { f.setLastSegment(lastSeg); f.setFirstFrame(firstFrame); f.setLastFrame(lastFrame); + connection->expand(f.encodedSize(), false); connection->handle(f); } @@ -209,6 +210,7 @@ QPID_AUTO_TEST_CASE(testBadClientData) { boost::shared_ptr<client::ConnectionImpl> ci = client::ConnectionAccess::getImpl(c0.connection); AMQFrame poison(boost::intrusive_ptr<AMQBody>(new PoisonPill)); + ci->expand(poison.encodedSize(), false); ci->handle(poison); { ScopedSuppressLogging sl; |