summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp18
-rw-r--r--cpp/src/qpid/client/SessionImpl.h9
2 files changed, 25 insertions, 2 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 49dd97e324..2d0f83b894 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -59,7 +59,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr<ConnectionImpl> con
connectionShared(conn),
connectionWeak(conn),
weakPtr(false),
- proxy(out),
+ ioHandler(*this),
+ proxy(ioHandler),
nextIn(0),
nextOut(0)
{
@@ -424,9 +425,22 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
+ sendFrame(frame, true);
+}
+
+void SessionImpl::proxyOut(AMQFrame& frame) // network thread
+{
+ //Note: this case is treated slightly differently that command
+ //frames sent by application; session controls should not be
+ //blocked by bounds checking on the outgoing frame queue.
+ sendFrame(frame, false);
+}
+
+void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock)
+{
boost::shared_ptr<ConnectionImpl> c = connectionWeak.lock();
if (c) {
- c->expand(frame.encodedSize(), true);
+ c->expand(frame.encodedSize(), canBlock);
channel.handle(frame);
}
}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 54ace77254..d56566ec14 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -137,6 +137,14 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
+ /**
+ * Sends session controls. This case is treated slightly
+ * differently than command frames sent by the application via
+ * handleOut(); session controlsare not subject to bounds checking
+ * on the outgoing frame queue.
+ */
+ void proxyOut(framing::AMQFrame& frame);
+ void sendFrame(framing::AMQFrame& frame, bool canBlock);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
@@ -185,6 +193,7 @@ private:
boost::weak_ptr<ConnectionImpl> connectionWeak;
bool weakPtr;
+ framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;