diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 146 |
1 files changed, 55 insertions, 91 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 11f3e84b70..1ed3277aae 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -62,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - scheduledCompleterContext(new ScheduledCompleterContext(this)) + asyncCommandCompleter(new AsyncCommandCompleter(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -102,25 +102,7 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); - // clean up any outstanding incomplete commands - { - qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); - incompleteCmds.clear(); - while (!copy.empty()) { - boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); - copy.erase(copy.begin()); - { - // note: need to drop lock, as callback may attempt to take it. - qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); - ref->cancel(); - } - } - } - - // At this point, we are guaranteed no further completion callbacks will be - // made. Cancel any outstanding scheduledCompleter calls... - scheduledCompleterContext->cancel(); + asyncCommandCompleter->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -276,13 +258,11 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); - - boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg))); - msg->setIngressCompletion( ac ); - ac->begin(); + msg->getIngressCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - ac->end(); // allows msg to complete xfer + IncompleteIngressMsgXfer xfer(this, msg); + msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } // Handle producer session flow control @@ -451,110 +431,94 @@ void SessionState::addPendingExecutionSync() } -/** factory for creating IncompleteIngressMsgXfer objects which - * can be references from Messages as ingress AsyncCompletion objects. +/** factory for creating a reference-counted IncompleteIngressMsgXfer object + * which will be attached to a message that will be completed asynchronously. */ -boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> -SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) +boost::intrusive_ptr<AsyncCompletion::Callback> +SessionState::IncompleteIngressMsgXfer::clone() { - SequenceNumber id = msg->getCommandId(); - boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg)); - qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); - incompleteCmds[id] = cmd; - return cmd; + boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); + return cb; } -/** Invoked by the asynchronous completer associated with - * a received msg that is pending Completion. May be invoked - * by the SessionState directly (sync == true), or some external - * entity (!sync). +/** Invoked by the asynchronous completer associated with a received + * msg that is pending Completion. May be invoked by the IO thread + * (sync == true), or some external thread (!sync). */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { if (!sync) { /** note well: this path may execute in any thread. It is safe to access - * the session, as the SessionState destructor will cancel all outstanding - * callbacks before getting destroyed (so we'll never get here). + * the scheduledCompleterContext, since *this has a shared pointer to it. + * but not session or msg! */ + session = 0; msg = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - if (session->scheduledCompleterContext->scheduleCompletion(id)) - session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, - session->scheduledCompleterContext)); - } else { // command is being completed in IO thread. - // this path runs only on the IO thread. - qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; - cmd = session->incompleteCmds.find(id); - if (cmd != session->incompleteCmds.end()) { - boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); - session->incompleteCmds.erase(cmd); - - if (session->isAttached()) { - QPID_LOG(debug, ": receive completed for msg seq=" << id); - qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); - session->completeRcvMsg(id, requiresAccept, requiresSync); - return; - } + completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); + } else { + // this path runs directly from the ac->end() call in handleContent() above, + // so *session and *msg are definately valid. + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + session->completeRcvMsg(id, requiresAccept, requiresSync); } } + completerContext.reset(); // ??? KAG optional ??? } -/** Scheduled from incomplete command's completed callback, safely completes all - * completed commands in the IO Thread. Guaranteed not to be running at the same - * time as the message receive code. +/** Scheduled from an asynchronous command's completed callback to run on + * the IO thread. */ -void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) +void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) { ctxt->completeCommands(); } -/** mark a command (sequence) as completed, return True if caller should - * schedule a call to completeCommands() +/** mark an ingress Message.Transfer command as completed. + * This method must be thread safe - it may run on any thread. */ -bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) +void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, + bool requiresAccept, + bool requiresSync) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); - - completedCmds.push_back(cmd); - return (completedCmds.size() == 1); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + + if (session) { + MessageInfo msg(cmd, requiresAccept, requiresSync); + completedMsgs.push_back(msg); + if (completedMsgs.size() == 1) { + session->getConnection().requestIOProcessing(boost::bind(&schedule, + session->asyncCommandCompleter)); + } + } } -/** Cause the session to complete all completed commands */ -void SessionState::ScheduledCompleterContext::completeCommands() +/** Cause the session to complete all completed commands. + * Executes on the IO thread. + */ +void SessionState::AsyncCommandCompleter::completeCommands() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); // when session is destroyed, it clears the session pointer via cancel(). - if (!session) return; - - while (!completedCmds.empty()) { - SequenceNumber id = completedCmds.front(); - completedCmds.pop_front(); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); - - cmd = session->incompleteCmds.find(id); - if (cmd !=session->incompleteCmds.end()) { - boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); - { - qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); - tmp->do_completion(); // retakes incompleteCmdslock - } - } + if (session && session->isAttached()) { + for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); + msg != completedMsgs.end(); ++msg) { + session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); } } + completedMsgs.clear(); } /** cancel any pending calls to scheduleComplete */ -void SessionState::ScheduledCompleterContext::cancel() +void SessionState::AsyncCommandCompleter::cancel() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); session = 0; } |