summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp151
1 files changed, 134 insertions, 17 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 6f02399795..ca98ee1437 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -59,7 +59,6 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore()),
- enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
mgmtObject(0),
rateFlowcontrol(0)
{
@@ -94,6 +93,18 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
+
+ // clean up any outstanding incomplete receive messages
+
+ qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+ while (!incompleteRcvMsgs.empty()) {
+ boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front());
+ incompleteRcvMsgs.pop_front();
+ {
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
+ ref->cancel();
+ }
+ }
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -195,15 +206,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
}
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+ currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ if (currentCommandComplete) receiverCompleted(id);
+
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
- incomplete.process(enqueuedOp, true);
+
+ if (method->isSync() && currentCommandComplete) {
sendAcceptAndCompletion();
}
}
@@ -247,21 +260,24 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
+
+ msg->getReceiveCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
-
- if (msg->isEnqueueComplete()) {
- enqueued(msg);
- } else {
- incomplete.add(msg);
- }
-
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
- incomplete.process(enqueuedOp, true);
- sendAcceptAndCompletion();
+ if (msg->getReceiveCompletion().getPendingCompleters() == 1) {
+ // There are no other pending receive completers (just this SessionState).
+ // Mark the message as completed.
+ completeRcvMsg( msg );
} else {
- incomplete.process(enqueuedOp, false);
+ // There are outstanding receive completers. Save the message until
+ // they are all done.
+ QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId());
+ boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg));
+ {
+ qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
+ incompleteRcvMsgs.push_back(pendingMsg);
+ }
+ msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete
}
}
@@ -312,11 +328,36 @@ void SessionState::sendAcceptAndCompletion()
sendCompletion();
}
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc). At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg)
{
+ bool callSendCompletion = false;
receiverCompleted(msg->getCommandId());
if (msg->requiresAccept())
+ // will cause msg's seq to appear in the next message.accept we send.
accepted.add(msg->getCommandId());
+
+ // Are there any outstanding Execution.Sync commands pending the
+ // completion of this msg? If so, complete them.
+ while (!pendingExecutionSyncs.empty() &&
+ receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+ const SequenceNumber& id = pendingExecutionSyncs.front();
+ pendingExecutionSyncs.pop();
+ QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
+ receiverCompleted(id);
+ callSendCompletion = true; // likely peer is pending for this completion.
+ }
+
+ // if the sender has requested immediate notification of the completion...
+ if (msg->getFrames().getMethod()->isSync()) {
+ sendAcceptAndCompletion();
+ } else if (callSendCompletion) {
+ sendCompletion();
+ }
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -389,4 +430,80 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
return handler->getClusterOrderProxy();
}
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+ SequenceNumber syncCommandId = receiverGetCurrent();
+ if (receiverGetIncomplete().front() < syncCommandId) {
+ currentCommandComplete = false;
+ pendingExecutionSyncs.push(syncCommandId);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+ }
+}
+
+
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion. May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
+ */
+void SessionState::IncompleteRcvMsg::operator() (bool sync)
+{
+ QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync);
+ boost::shared_ptr<IncompleteRcvMsg> tmp;
+ {
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
+ for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin();
+ i != session->incompleteRcvMsgs.end(); ++i) {
+ if (i->get() == this) {
+ tmp.swap(*i);
+ session->incompleteRcvMsgs.remove(*i);
+ break;
+ }
+ }
+ }
+
+ if (session->isAttached()) {
+ if (sync) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
+ session->completeRcvMsg(msg);
+ } else { // potentially called from a different thread
+ QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
+ session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp));
+ }
+ }
+}
+
+
+/** Scheduled from IncompleteRcvMsg callback, completes the message receive
+ * asynchronously
+ */
+void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg)
+{
+ QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
+ if (iMsg->session && iMsg->session->isAttached()) {
+ QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
+ iMsg->session->completeRcvMsg(iMsg->msg);
+ }
+}
+
+
+/** Cancels a pending incomplete receive message completion callback. Note
+ * well: will wait for the callback to finish if it is currently in progress
+ * on another thread.
+ */
+void SessionState::IncompleteRcvMsg::cancel()
+{
+ QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId());
+ // Cancel the message complete callback. On return, we are guaranteed there
+ // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync)
+ msg->getReceiveCompletion().cancel();
+ // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending,
+ // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg.
+ session = 0;
+}
+
}} // namespace qpid::broker