summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SessionImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-29 20:15:18 +0000
committerGordon Sim <gsim@apache.org>2008-04-29 20:15:18 +0000
commitacc0dee435e1fa22e3b1e7cdfecf6913bf88988e (patch)
tree729f7a03543acf23380e68897f8788a3e6b45e2e /cpp/src/qpid/client/SessionImpl.cpp
parenta19ce3b1863f80c1232ec2690cd920325a39d71a (diff)
downloadqpid-python-acc0dee435e1fa22e3b1e7cdfecf6913bf88988e.tar.gz
QPID-974: allow the size of the queue of outgoing frames to be restricted
QPID-544: tidy up configuration (ensuring desired settings are used correctly, allowing tcp socket options to be set etc) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652083 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp46
1 files changed, 29 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 571d54df0c..e998d040c8 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes
typedef sys::Monitor::ScopedLock Lock;
typedef sys::Monitor::ScopedUnlock UnLock;
+typedef sys::ScopedLock<sys::Semaphore> Acquire;
SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
@@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
name(id.str()),
//TODO: may want to allow application defined names instead
connection(conn),
+ ioHandler(*this),
channel(ch),
- proxy(channel),
+ proxy(ioHandler),
nextIn(0),
nextOut(0)
{
@@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content)
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
- Lock l(state);
- checkOpen();
+ Acquire a(sendLock);
SequenceNumber id = nextOut++;
- incompleteOut.add(id);
+ bool sync;
+ {
+ Lock l(state);
+ checkOpen();
+ incompleteOut.add(id);
+ sync = syncMode;
+ }
- if (syncMode) command.getMethod()->setSync(syncMode);
+ if (sync) command.getMethod()->setSync(true);
Future f(id);
if (command.getMethod()->resultExpected()) {
+ Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
}
@@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
if (content) {
sendContent(*content);
}
- if (syncMode) {
- waitForCompletionImpl(id);
+ if (sync) {
+ waitForCompletion(id);
}
return f;
}
-
void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
- header.setBof(false);
+ header.setFirstSegment(false);
u_int64_t data_length = content.getData().length();
if(data_length > 0){
- header.setEof(false);
+ header.setLastSegment(false);
handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1);
if(data_length < frag_size){
AMQFrame frame(in_place<AMQContentBody>(content.getData()));
- frame.setBof(false);
+ frame.setFirstSegment(false);
handleOut(frame);
}else{
u_int32_t offset = 0;
@@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content)
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
AMQFrame frame(in_place<AMQContentBody>(frag));
- frame.setBof(false);
+ frame.setFirstSegment(false);
+ frame.setLastSegment(true);
if (offset > 0) {
- frame.setBos(false);
+ frame.setFirstFrame(false);
}
offset += length;
remaining = data_length - offset;
if (remaining) {
- frame.setEos(false);
- frame.setEof(false);
+ frame.setLastFrame(false);
}
handleOut(frame);
}
@@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
+ connection->expand(frame.size(), true);
+ channel.handle(frame);
+}
+
+void SessionImpl::proxyOut(AMQFrame& frame) // network thread
+{
+ connection->expand(frame.size(), false);
channel.handle(frame);
}
@@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const
void SessionImpl::handleClosed()
{
- QPID_LOG(info, "SessionImpl::handleClosed(): entering");
demux.close();
results.close();
- QPID_LOG(info, "SessionImpl::handleClosed(): returning");
}
}}