summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-27 18:04:42 +0000
committerGordon Sim <gsim@apache.org>2008-03-27 18:04:42 +0000
commitf9380575ddccbe48edd5305e96db70892c1dc1aa (patch)
treebf4361ed80e1ad3c3d2aca4a345c8d0b49d5d642 /cpp/src/qpid/broker/SessionState.cpp
parent719c2529a14527c236e871603136ccbe44f632d3 (diff)
downloadqpid-python-f9380575ddccbe48edd5305e96db70892c1dc1aa.tar.gz
Send accept in response to message publications if required.
Hold up completion (and accept) until message from transfer is fully enqueued. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp25
1 files changed, 22 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 35ad562a22..19fb0a4a79 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -50,7 +50,8 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
+ enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- //TODO: may want to hold up execution until async enqueue is complete
- completed.add(msg->getCommandId());
+
+ if (msg->isEnqueueComplete()) {
+ enqueued(msg);
+ } else {
+ incomplete.add(msg);
+ }
+
+ //hold up execution until async enqueue is complete
if (msg->getFrames().getMethod()->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
+ } else {
+ incomplete.process(enqueuedOp, false);
}
}
}
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+{
+ completed.add(msg->getCommandId());
+ if (msg->requiresAccept()) {
+ getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ }
+}
+
void SessionState::handle(AMQFrame& frame)
{
received(frame);