summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp208
1 files changed, 185 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 1ca7b6dfc1..11f3e84b70 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -60,9 +60,9 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore()),
- enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
mgmtObject(0),
- rateFlowcontrol(0)
+ rateFlowcontrol(0),
+ scheduledCompleterContext(new ScheduledCompleterContext(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -101,6 +101,26 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
+
+ // clean up any outstanding incomplete commands
+ {
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
+ incompleteCmds.clear();
+ while (!copy.empty()) {
+ boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
+ copy.erase(copy.begin());
+ {
+ // note: need to drop lock, as callback may attempt to take it.
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
+ ref->cancel();
+ }
+ }
+ }
+
+ // At this point, we are guaranteed no further completion callbacks will be
+ // made. Cancel any outstanding scheduledCompleter calls...
+ scheduledCompleterContext->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -202,15 +222,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
}
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+ currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ if (currentCommandComplete) receiverCompleted(id);
+
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
- incomplete.process(enqueuedOp, true);
+
+ if (method->isSync() && currentCommandComplete) {
sendAcceptAndCompletion();
}
}
@@ -254,22 +276,13 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
+
+ boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
+ msg->setIngressCompletion( ac );
+ ac->begin();
semanticState.handle(msg);
msgBuilder.end();
-
- if (msg->isEnqueueComplete()) {
- enqueued(msg);
- } else {
- incomplete.add(msg);
- }
-
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
- incomplete.process(enqueuedOp, true);
- sendAcceptAndCompletion();
- } else {
- incomplete.process(enqueuedOp, false);
- }
+ ac->end(); // allows msg to complete xfer
}
// Handle producer session flow control
@@ -319,11 +332,38 @@ void SessionState::sendAcceptAndCompletion()
sendCompletion();
}
-void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+/** Invoked when the given inbound message is finished being processed
+ * by all interested parties (eg. it is done being enqueued to all queues,
+ * its credit has been accounted for, etc). At this point, msg is considered
+ * by this receiver as 'completed' (as defined by AMQP 0_10)
+ */
+void SessionState::completeRcvMsg(SequenceNumber id,
+ bool requiresAccept,
+ bool requiresSync)
{
- receiverCompleted(msg->getCommandId());
- if (msg->requiresAccept())
- accepted.add(msg->getCommandId());
+ bool callSendCompletion = false;
+ receiverCompleted(id);
+ if (requiresAccept)
+ // will cause msg's seq to appear in the next message.accept we send.
+ accepted.add(id);
+
+ // Are there any outstanding Execution.Sync commands pending the
+ // completion of this msg? If so, complete them.
+ while (!pendingExecutionSyncs.empty() &&
+ receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
+ const SequenceNumber id = pendingExecutionSyncs.front();
+ pendingExecutionSyncs.pop();
+ QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
+ receiverCompleted(id);
+ callSendCompletion = true; // likely peer is pending for this completion.
+ }
+
+ // if the sender has requested immediate notification of the completion...
+ if (requiresSync) {
+ sendAcceptAndCompletion();
+ } else if (callSendCompletion) {
+ sendCompletion();
+ }
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -396,4 +436,126 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
return handler->getClusterOrderProxy();
}
+
+// Current received command is an execution.sync command.
+// Complete this command only when all preceding commands have completed.
+// (called via the invoker() in handleCommand() above)
+void SessionState::addPendingExecutionSync()
+{
+ SequenceNumber syncCommandId = receiverGetCurrent();
+ if (receiverGetIncomplete().front() < syncCommandId) {
+ currentCommandComplete = false;
+ pendingExecutionSyncs.push(syncCommandId);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+ }
+}
+
+
+/** factory for creating IncompleteIngressMsgXfer objects which
+ * can be references from Messages as ingress AsyncCompletion objects.
+ */
+boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
+SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+{
+ SequenceNumber id = msg->getCommandId();
+ boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ incompleteCmds[id] = cmd;
+ return cmd;
+}
+
+
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion. May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
+ */
+void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
+{
+ if (!sync) {
+ /** note well: this path may execute in any thread. It is safe to access
+ * the session, as the SessionState destructor will cancel all outstanding
+ * callbacks before getting destroyed (so we'll never get here).
+ */
+ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
+ if (session->scheduledCompleterContext->scheduleCompletion(id))
+ session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+ session->scheduledCompleterContext));
+ } else { // command is being completed in IO thread.
+ // this path runs only on the IO thread.
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+ cmd = session->incompleteCmds.find(id);
+ if (cmd != session->incompleteCmds.end()) {
+ boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+ session->incompleteCmds.erase(cmd);
+
+ if (session->isAttached()) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << id);
+ qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+ session->completeRcvMsg(id, requiresAccept, requiresSync);
+ return;
+ }
+ }
+ }
+}
+
+
+/** Scheduled from incomplete command's completed callback, safely completes all
+ * completed commands in the IO Thread. Guaranteed not to be running at the same
+ * time as the message receive code.
+ */
+void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
+{
+ ctxt->completeCommands();
+}
+
+
+/** mark a command (sequence) as completed, return True if caller should
+ * schedule a call to completeCommands()
+ */
+bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+ completedCmds.push_back(cmd);
+ return (completedCmds.size() == 1);
+}
+
+
+/** Cause the session to complete all completed commands */
+void SessionState::ScheduledCompleterContext::completeCommands()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+ // when session is destroyed, it clears the session pointer via cancel().
+ if (!session) return;
+
+ while (!completedCmds.empty()) {
+ SequenceNumber id = completedCmds.front();
+ completedCmds.pop_front();
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
+
+ cmd = session->incompleteCmds.find(id);
+ if (cmd !=session->incompleteCmds.end()) {
+ boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+ {
+ qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
+ tmp->do_completion(); // retakes incompleteCmdslock
+ }
+ }
+ }
+ }
+}
+
+
+/** cancel any pending calls to scheduleComplete */
+void SessionState::ScheduledCompleterContext::cancel()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+ session = 0;
+}
+
}} // namespace qpid::broker