diff options
author | Gordon Sim <gsim@apache.org> | 2008-04-29 20:15:18 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-04-29 20:15:18 +0000 |
commit | acc0dee435e1fa22e3b1e7cdfecf6913bf88988e (patch) | |
tree | 729f7a03543acf23380e68897f8788a3e6b45e2e /cpp/src/qpid/client/SessionImpl.cpp | |
parent | a19ce3b1863f80c1232ec2690cd920325a39d71a (diff) | |
download | qpid-python-acc0dee435e1fa22e3b1e7cdfecf6913bf88988e.tar.gz |
QPID-974: allow the size of the queue of outgoing frames to be restricted
QPID-544: tidy up configuration (ensuring desired settings are used correctly,
allowing tcp socket options to be set etc)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652083 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 571d54df0c..e998d040c8 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes typedef sys::Monitor::ScopedLock Lock; typedef sys::Monitor::ScopedUnlock UnLock; +typedef sys::ScopedLock<sys::Semaphore> Acquire; SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, @@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, name(id.str()), //TODO: may want to allow application defined names instead connection(conn), + ioHandler(*this), channel(ch), - proxy(channel), + proxy(ioHandler), nextIn(0), nextOut(0) { @@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content) Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { - Lock l(state); - checkOpen(); + Acquire a(sendLock); SequenceNumber id = nextOut++; - incompleteOut.add(id); + bool sync; + { + Lock l(state); + checkOpen(); + incompleteOut.add(id); + sync = syncMode; + } - if (syncMode) command.getMethod()->setSync(syncMode); + if (sync) command.getMethod()->setSync(true); Future f(id); if (command.getMethod()->resultExpected()) { + Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); } @@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con if (content) { sendContent(*content); } - if (syncMode) { - waitForCompletionImpl(id); + if (sync) { + waitForCompletion(id); } return f; } - void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - header.setBof(false); + header.setFirstSegment(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ - header.setEof(false); + header.setLastSegment(false); handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1); if(data_length < frag_size){ AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); + frame.setFirstSegment(false); handleOut(frame); }else{ u_int32_t offset = 0; @@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); + frame.setFirstSegment(false); + frame.setLastSegment(true); if (offset > 0) { - frame.setBos(false); + frame.setFirstFrame(false); } offset += length; remaining = data_length - offset; if (remaining) { - frame.setEos(false); - frame.setEof(false); + frame.setLastFrame(false); } handleOut(frame); } @@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { + connection->expand(frame.size(), true); + channel.handle(frame); +} + +void SessionImpl::proxyOut(AMQFrame& frame) // network thread +{ + connection->expand(frame.size(), false); channel.handle(frame); } @@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - QPID_LOG(info, "SessionImpl::handleClosed(): entering"); demux.close(); results.close(); - QPID_LOG(info, "SessionImpl::handleClosed(): returning"); } }} |