diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 9 |
2 files changed, 25 insertions, 2 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 49dd97e324..2d0f83b894 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -59,7 +59,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con connectionShared(conn), connectionWeak(conn), weakPtr(false), - proxy(out), + ioHandler(*this), + proxy(ioHandler), nextIn(0), nextOut(0) { @@ -424,9 +425,22 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { + sendFrame(frame, true); +} + +void SessionImpl::proxyOut(AMQFrame& frame) // network thread +{ + //Note: this case is treated slightly differently that command + //frames sent by application; session controls should not be + //blocked by bounds checking on the outgoing frame queue. + sendFrame(frame, false); +} + +void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock) +{ boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock(); if (c) { - c->expand(frame.encodedSize(), true); + c->expand(frame.encodedSize(), canBlock); channel.handle(frame); } } diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 54ace77254..d56566ec14 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -137,6 +137,14 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + /** + * Sends session controls. This case is treated slightly + * differently than command frames sent by the application via + * handleOut(); session controlsare not subject to bounds checking + * on the outgoing frame queue. + */ + void proxyOut(framing::AMQFrame& frame); + void sendFrame(framing::AMQFrame& frame, bool canBlock); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); @@ -185,6 +193,7 @@ private: boost::weak_ptr<ConnectionImpl> connectionWeak; bool weakPtr; + framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; |