diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 35ad562a22..19fb0a4a79 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -50,7 +50,8 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - //TODO: may want to hold up execution until async enqueue is complete - completed.add(msg->getCommandId()); + + if (msg->isEnqueueComplete()) { + enqueued(msg); + } else { + incomplete.add(msg); + } + + //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); + } else { + incomplete.process(enqueuedOp, false); } } } +void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +{ + completed.add(msg->getCommandId()); + if (msg->requiresAccept()) { + getProxy().getMessage010().accept(SequenceSet(msg->getCommandId())); + } +} + void SessionState::handle(AMQFrame& frame) { received(frame); |