summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp25
1 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 35ad562a22..19fb0a4a79 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -50,7 +50,8 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
+ enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- //TODO: may want to hold up execution until async enqueue is complete
- completed.add(msg->getCommandId());
+
+ if (msg->isEnqueueComplete()) {
+ enqueued(msg);
+ } else {
+ incomplete.add(msg);
+ }
+
+ //hold up execution until async enqueue is complete
if (msg->getFrames().getMethod()->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
+ } else {
+ incomplete.process(enqueuedOp, false);
}
}
}
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+{
+ completed.add(msg->getCommandId());
+ if (msg->requiresAccept()) {
+ getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ }
+}
+
void SessionState::handle(AMQFrame& frame)
{
received(frame);