diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 221 |
1 files changed, 92 insertions, 129 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 742dbe9be8..11f3e84b70 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -25,7 +25,6 @@ #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/RateFlowcontrol.h" -#include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" @@ -63,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - asyncCommandCompleter(new AsyncCommandCompleter(this)) + scheduledCompleterContext(new ScheduledCompleterContext(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -96,13 +95,32 @@ void SessionState::addManagementObject() { } SessionState::~SessionState() { - asyncCommandCompleter->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); if (flowControlTimer) flowControlTimer->cancel(); + + // clean up any outstanding incomplete commands + { + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); + incompleteCmds.clear(); + while (!copy.empty()) { + boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); + copy.erase(copy.begin()); + { + // note: need to drop lock, as callback may attempt to take it. + qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); + ref->cancel(); + } + } + } + + // At this point, we are guaranteed no further completion callbacks will be + // made. Cancel any outstanding scheduledCompleter calls... + scheduledCompleterContext->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -127,7 +145,6 @@ bool SessionState::isLocal(const ConnectionToken* t) const void SessionState::detach() { QPID_LOG(debug, getId() << ": detached on broker."); - asyncCommandCompleter->detached(); disableOutput(); handler = 0; if (mgmtObject != 0) @@ -148,7 +165,6 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } - asyncCommandCompleter->attached(); } void SessionState::abort() { @@ -260,11 +276,13 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); - msg->getIngressCompletion().begin(); + + boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg))); + msg->setIngressCompletion( ac ); + ac->begin(); semanticState.handle(msg); msgBuilder.end(); - IncompleteIngressMsgXfer xfer(this, msg); - msg->getIngressCompletion().end(xfer); // allows msg to complete xfer + ac->end(); // allows msg to complete xfer } // Handle producer session flow control @@ -323,11 +341,6 @@ void SessionState::completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync) { - // Mark this as a cluster-unsafe scope since it can be called in - // journal threads or connection threads as part of asynchronous - // command completion. - sys::ClusterUnsafeScope cus; - bool callSendCompletion = false; receiverCompleted(id); if (requiresAccept) @@ -433,166 +446,116 @@ void SessionState::addPendingExecutionSync() if (receiverGetIncomplete().front() < syncCommandId) { currentCommandComplete = false; pendingExecutionSyncs.push(syncCommandId); - asyncCommandCompleter->flushPendingMessages(); QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); } } -/** factory for creating a reference-counted IncompleteIngressMsgXfer object - * which will be attached to a message that will be completed asynchronously. +/** factory for creating IncompleteIngressMsgXfer objects which + * can be references from Messages as ingress AsyncCompletion objects. */ -boost::intrusive_ptr<AsyncCompletion::Callback> -SessionState::IncompleteIngressMsgXfer::clone() +boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> +SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) { - // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. - // If the client is pending the message.transfer completion, flush now to force immediate write to journal. - if (requiresSync) - msg->flush(); - else { - // otherwise, we need to track this message in order to flush it if an execution.sync arrives - // before it has been completed (see flushPendingMessages()) - pending = true; - completerContext->addPendingMessage(msg); - } - - return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this)); + SequenceNumber id = msg->getCommandId(); + boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg)); + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + incompleteCmds[id] = cmd; + return cmd; } -/** Invoked by the asynchronous completer associated with a received - * msg that is pending Completion. May be invoked by the IO thread - * (sync == true), or some external thread (!sync). +/** Invoked by the asynchronous completer associated with + * a received msg that is pending Completion. May be invoked + * by the SessionState directly (sync == true), or some external + * entity (!sync). */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - if (pending) completerContext->deletePendingMessage(id); if (!sync) { /** note well: this path may execute in any thread. It is safe to access - * the scheduledCompleterContext, since *this has a shared pointer to it. - * but not session! + * the session, as the SessionState destructor will cancel all outstanding + * callbacks before getting destroyed (so we'll never get here). */ - session = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); - } else { - // this path runs directly from the ac->end() call in handleContent() above, - // so *session is definately valid. - if (session->isAttached()) { - QPID_LOG(debug, ": receive completed for msg seq=" << id); - session->completeRcvMsg(id, requiresAccept, requiresSync); + if (session->scheduledCompleterContext->scheduleCompletion(id)) + session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, + session->scheduledCompleterContext)); + } else { // command is being completed in IO thread. + // this path runs only on the IO thread. + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + cmd = session->incompleteCmds.find(id); + if (cmd != session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + session->incompleteCmds.erase(cmd); + + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); + session->completeRcvMsg(id, requiresAccept, requiresSync); + return; + } } } - completerContext = boost::intrusive_ptr<AsyncCommandCompleter>(); } -/** Scheduled from an asynchronous command's completed callback to run on - * the IO thread. +/** Scheduled from incomplete command's completed callback, safely completes all + * completed commands in the IO Thread. Guaranteed not to be running at the same + * time as the message receive code. */ -void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) +void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) { ctxt->completeCommands(); } -/** Track an ingress message that is pending completion */ -void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg); - bool unique = pendingMsgs.insert(item).second; - if (!unique) { - assert(false); - } -} - - -/** pending message has completed */ -void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - pendingMsgs.erase(id); -} - - -/** done when an execution.sync arrives */ -void SessionState::AsyncCommandCompleter::flushPendingMessages() -{ - std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy; - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. - } - // drop lock, so it is safe to call "flush()" - for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin(); - i != copy.end(); ++i) { - i->second->flush(); - } -} - - -/** mark an ingress Message.Transfer command as completed. - * This method must be thread safe - it may run on any thread. +/** mark a command (sequence) as completed, return True if caller should + * schedule a call to completeCommands() */ -void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, - bool requiresAccept, - bool requiresSync) +bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - - if (session && isAttached) { - MessageInfo msg(cmd, requiresAccept, requiresSync); - completedMsgs.push_back(msg); - if (completedMsgs.size() == 1) { - session->getConnection().requestIOProcessing(boost::bind(&schedule, - session->asyncCommandCompleter)); - } - } + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + completedCmds.push_back(cmd); + return (completedCmds.size() == 1); } -/** Cause the session to complete all completed commands. - * Executes on the IO thread. - */ -void SessionState::AsyncCommandCompleter::completeCommands() +/** Cause the session to complete all completed commands */ +void SessionState::ScheduledCompleterContext::completeCommands() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); // when session is destroyed, it clears the session pointer via cancel(). - if (session && session->isAttached()) { - for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); - msg != completedMsgs.end(); ++msg) { - session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); + if (!session) return; + + while (!completedCmds.empty()) { + SequenceNumber id = completedCmds.front(); + completedCmds.pop_front(); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); + + cmd = session->incompleteCmds.find(id); + if (cmd !=session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + { + qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); + tmp->do_completion(); // retakes incompleteCmdslock + } + } } } - completedMsgs.clear(); } /** cancel any pending calls to scheduleComplete */ -void SessionState::AsyncCommandCompleter::cancel() +void SessionState::ScheduledCompleterContext::cancel() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); session = 0; } - -/** inform the completer that the session has attached, - * allows command completion scheduling from any thread */ -void SessionState::AsyncCommandCompleter::attached() -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - isAttached = true; -} - - -/** inform the completer that the session has detached, - * disables command completion scheduling from any thread */ -void SessionState::AsyncCommandCompleter::detached() -{ - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - isAttached = false; -} - }} // namespace qpid::broker |