diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-27 18:04:42 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-27 18:04:42 +0000 |
commit | f9380575ddccbe48edd5305e96db70892c1dc1aa (patch) | |
tree | bf4361ed80e1ad3c3d2aca4a345c8d0b49d5d642 /cpp/src/qpid/broker/SessionState.cpp | |
parent | 719c2529a14527c236e871603136ccbe44f632d3 (diff) | |
download | qpid-python-f9380575ddccbe48edd5305e96db70892c1dc1aa.tar.gz |
Send accept in response to message publications if required.
Hold up completion (and accept) until message from transfer is fully enqueued.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 35ad562a22..19fb0a4a79 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -50,7 +50,8 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - //TODO: may want to hold up execution until async enqueue is complete - completed.add(msg->getCommandId()); + + if (msg->isEnqueueComplete()) { + enqueued(msg); + } else { + incomplete.add(msg); + } + + //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); + } else { + incomplete.process(enqueuedOp, false); } } } +void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +{ + completed.add(msg->getCommandId()); + if (msg->requiresAccept()) { + getProxy().getMessage010().accept(SequenceSet(msg->getCommandId())); + } +} + void SessionState::handle(AMQFrame& frame) { received(frame); |