diff options
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 571d54df0c..e998d040c8 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes typedef sys::Monitor::ScopedLock Lock; typedef sys::Monitor::ScopedUnlock UnLock; +typedef sys::ScopedLock<sys::Semaphore> Acquire; SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, @@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, name(id.str()), //TODO: may want to allow application defined names instead connection(conn), + ioHandler(*this), channel(ch), - proxy(channel), + proxy(ioHandler), nextIn(0), nextOut(0) { @@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content) Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { - Lock l(state); - checkOpen(); + Acquire a(sendLock); SequenceNumber id = nextOut++; - incompleteOut.add(id); + bool sync; + { + Lock l(state); + checkOpen(); + incompleteOut.add(id); + sync = syncMode; + } - if (syncMode) command.getMethod()->setSync(syncMode); + if (sync) command.getMethod()->setSync(true); Future f(id); if (command.getMethod()->resultExpected()) { + Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); } @@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con if (content) { sendContent(*content); } - if (syncMode) { - waitForCompletionImpl(id); + if (sync) { + waitForCompletion(id); } return f; } - void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - header.setBof(false); + header.setFirstSegment(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ - header.setEof(false); + header.setLastSegment(false); handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1); if(data_length < frag_size){ AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); + frame.setFirstSegment(false); handleOut(frame); }else{ u_int32_t offset = 0; @@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); + frame.setFirstSegment(false); + frame.setLastSegment(true); if (offset > 0) { - frame.setBos(false); + frame.setFirstFrame(false); } offset += length; remaining = data_length - offset; if (remaining) { - frame.setEos(false); - frame.setEof(false); + frame.setLastFrame(false); } handleOut(frame); } @@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { + connection->expand(frame.size(), true); + channel.handle(frame); +} + +void SessionImpl::proxyOut(AMQFrame& frame) // network thread +{ + connection->expand(frame.size(), false); channel.handle(frame); } @@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - QPID_LOG(info, "SessionImpl::handleClosed(): entering"); demux.close(); results.close(); - QPID_LOG(info, "SessionImpl::handleClosed(): returning"); } }} |