summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp146
1 files changed, 55 insertions, 91 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 11f3e84b70..1ed3277aae 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -62,7 +62,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- scheduledCompleterContext(new ScheduledCompleterContext(this))
+ asyncCommandCompleter(new AsyncCommandCompleter(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -102,25 +102,7 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
- // clean up any outstanding incomplete commands
- {
- qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
- incompleteCmds.clear();
- while (!copy.empty()) {
- boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
- copy.erase(copy.begin());
- {
- // note: need to drop lock, as callback may attempt to take it.
- qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
- ref->cancel();
- }
- }
- }
-
- // At this point, we are guaranteed no further completion callbacks will be
- // made. Cancel any outstanding scheduledCompleter calls...
- scheduledCompleterContext->cancel();
+ asyncCommandCompleter->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -276,13 +258,11 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
-
- boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
- msg->setIngressCompletion( ac );
- ac->begin();
+ msg->getIngressCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
- ac->end(); // allows msg to complete xfer
+ IncompleteIngressMsgXfer xfer(this, msg);
+ msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
// Handle producer session flow control
@@ -451,110 +431,94 @@ void SessionState::addPendingExecutionSync()
}
-/** factory for creating IncompleteIngressMsgXfer objects which
- * can be references from Messages as ingress AsyncCompletion objects.
+/** factory for creating a reference-counted IncompleteIngressMsgXfer object
+ * which will be attached to a message that will be completed asynchronously.
*/
-boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
-SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+boost::intrusive_ptr<AsyncCompletion::Callback>
+SessionState::IncompleteIngressMsgXfer::clone()
{
- SequenceNumber id = msg->getCommandId();
- boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
- qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
- incompleteCmds[id] = cmd;
- return cmd;
+ boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
+ return cb;
}
-/** Invoked by the asynchronous completer associated with
- * a received msg that is pending Completion. May be invoked
- * by the SessionState directly (sync == true), or some external
- * entity (!sync).
+/** Invoked by the asynchronous completer associated with a received
+ * msg that is pending Completion. May be invoked by the IO thread
+ * (sync == true), or some external thread (!sync).
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
- * the session, as the SessionState destructor will cancel all outstanding
- * callbacks before getting destroyed (so we'll never get here).
+ * the scheduledCompleterContext, since *this has a shared pointer to it.
+ * but not session or msg!
*/
+ session = 0; msg = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- if (session->scheduledCompleterContext->scheduleCompletion(id))
- session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
- session->scheduledCompleterContext));
- } else { // command is being completed in IO thread.
- // this path runs only on the IO thread.
- qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
- cmd = session->incompleteCmds.find(id);
- if (cmd != session->incompleteCmds.end()) {
- boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
- session->incompleteCmds.erase(cmd);
-
- if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << id);
- qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
- return;
- }
+ completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+ } else {
+ // this path runs directly from the ac->end() call in handleContent() above,
+ // so *session and *msg are definately valid.
+ if (session->isAttached()) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << id);
+ session->completeRcvMsg(id, requiresAccept, requiresSync);
}
}
+ completerContext.reset(); // ??? KAG optional ???
}
-/** Scheduled from incomplete command's completed callback, safely completes all
- * completed commands in the IO Thread. Guaranteed not to be running at the same
- * time as the message receive code.
+/** Scheduled from an asynchronous command's completed callback to run on
+ * the IO thread.
*/
-void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
+void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
{
ctxt->completeCommands();
}
-/** mark a command (sequence) as completed, return True if caller should
- * schedule a call to completeCommands()
+/** mark an ingress Message.Transfer command as completed.
+ * This method must be thread safe - it may run on any thread.
*/
-bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
+void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
-
- completedCmds.push_back(cmd);
- return (completedCmds.size() == 1);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+ if (session) {
+ MessageInfo msg(cmd, requiresAccept, requiresSync);
+ completedMsgs.push_back(msg);
+ if (completedMsgs.size() == 1) {
+ session->getConnection().requestIOProcessing(boost::bind(&schedule,
+ session->asyncCommandCompleter));
+ }
+ }
}
-/** Cause the session to complete all completed commands */
-void SessionState::ScheduledCompleterContext::completeCommands()
+/** Cause the session to complete all completed commands.
+ * Executes on the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::completeCommands()
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
// when session is destroyed, it clears the session pointer via cancel().
- if (!session) return;
-
- while (!completedCmds.empty()) {
- SequenceNumber id = completedCmds.front();
- completedCmds.pop_front();
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
-
- cmd = session->incompleteCmds.find(id);
- if (cmd !=session->incompleteCmds.end()) {
- boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
- {
- qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
- tmp->do_completion(); // retakes incompleteCmdslock
- }
- }
+ if (session && session->isAttached()) {
+ for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
+ msg != completedMsgs.end(); ++msg) {
+ session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
}
}
+ completedMsgs.clear();
}
/** cancel any pending calls to scheduleComplete */
-void SessionState::ScheduledCompleterContext::cancel()
+void SessionState::AsyncCommandCompleter::cancel()
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
session = 0;
}