summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp221
1 files changed, 92 insertions, 129 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 742dbe9be8..11f3e84b70 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -25,7 +25,6 @@
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -63,7 +62,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- asyncCommandCompleter(new AsyncCommandCompleter(this))
+ scheduledCompleterContext(new ScheduledCompleterContext(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -96,13 +95,32 @@ void SessionState::addManagementObject() {
}
SessionState::~SessionState() {
- asyncCommandCompleter->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
if (flowControlTimer)
flowControlTimer->cancel();
+
+ // clean up any outstanding incomplete commands
+ {
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
+ incompleteCmds.clear();
+ while (!copy.empty()) {
+ boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
+ copy.erase(copy.begin());
+ {
+ // note: need to drop lock, as callback may attempt to take it.
+ qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
+ ref->cancel();
+ }
+ }
+ }
+
+ // At this point, we are guaranteed no further completion callbacks will be
+ // made. Cancel any outstanding scheduledCompleter calls...
+ scheduledCompleterContext->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -127,7 +145,6 @@ bool SessionState::isLocal(const ConnectionToken* t) const
void SessionState::detach() {
QPID_LOG(debug, getId() << ": detached on broker.");
- asyncCommandCompleter->detached();
disableOutput();
handler = 0;
if (mgmtObject != 0)
@@ -148,7 +165,6 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
- asyncCommandCompleter->attached();
}
void SessionState::abort() {
@@ -260,11 +276,13 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
- msg->getIngressCompletion().begin();
+
+ boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
+ msg->setIngressCompletion( ac );
+ ac->begin();
semanticState.handle(msg);
msgBuilder.end();
- IncompleteIngressMsgXfer xfer(this, msg);
- msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
+ ac->end(); // allows msg to complete xfer
}
// Handle producer session flow control
@@ -323,11 +341,6 @@ void SessionState::completeRcvMsg(SequenceNumber id,
bool requiresAccept,
bool requiresSync)
{
- // Mark this as a cluster-unsafe scope since it can be called in
- // journal threads or connection threads as part of asynchronous
- // command completion.
- sys::ClusterUnsafeScope cus;
-
bool callSendCompletion = false;
receiverCompleted(id);
if (requiresAccept)
@@ -433,166 +446,116 @@ void SessionState::addPendingExecutionSync()
if (receiverGetIncomplete().front() < syncCommandId) {
currentCommandComplete = false;
pendingExecutionSyncs.push(syncCommandId);
- asyncCommandCompleter->flushPendingMessages();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
}
}
-/** factory for creating a reference-counted IncompleteIngressMsgXfer object
- * which will be attached to a message that will be completed asynchronously.
+/** factory for creating IncompleteIngressMsgXfer objects which
+ * can be references from Messages as ingress AsyncCompletion objects.
*/
-boost::intrusive_ptr<AsyncCompletion::Callback>
-SessionState::IncompleteIngressMsgXfer::clone()
+boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
+SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
{
- // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
- // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
- if (requiresSync)
- msg->flush();
- else {
- // otherwise, we need to track this message in order to flush it if an execution.sync arrives
- // before it has been completed (see flushPendingMessages())
- pending = true;
- completerContext->addPendingMessage(msg);
- }
-
- return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
+ SequenceNumber id = msg->getCommandId();
+ boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
+ qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
+ incompleteCmds[id] = cmd;
+ return cmd;
}
-/** Invoked by the asynchronous completer associated with a received
- * msg that is pending Completion. May be invoked by the IO thread
- * (sync == true), or some external thread (!sync).
+/** Invoked by the asynchronous completer associated with
+ * a received msg that is pending Completion. May be invoked
+ * by the SessionState directly (sync == true), or some external
+ * entity (!sync).
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
- if (pending) completerContext->deletePendingMessage(id);
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
- * the scheduledCompleterContext, since *this has a shared pointer to it.
- * but not session!
+ * the session, as the SessionState destructor will cancel all outstanding
+ * callbacks before getting destroyed (so we'll never get here).
*/
- session = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
- } else {
- // this path runs directly from the ac->end() call in handleContent() above,
- // so *session is definately valid.
- if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << id);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
+ if (session->scheduledCompleterContext->scheduleCompletion(id))
+ session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+ session->scheduledCompleterContext));
+ } else { // command is being completed in IO thread.
+ // this path runs only on the IO thread.
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+ cmd = session->incompleteCmds.find(id);
+ if (cmd != session->incompleteCmds.end()) {
+ boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+ session->incompleteCmds.erase(cmd);
+
+ if (session->isAttached()) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << id);
+ qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
+ session->completeRcvMsg(id, requiresAccept, requiresSync);
+ return;
+ }
}
}
- completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
}
-/** Scheduled from an asynchronous command's completed callback to run on
- * the IO thread.
+/** Scheduled from incomplete command's completed callback, safely completes all
+ * completed commands in the IO Thread. Guaranteed not to be running at the same
+ * time as the message receive code.
*/
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
+void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
{
ctxt->completeCommands();
}
-/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
- bool unique = pendingMsgs.insert(item).second;
- if (!unique) {
- assert(false);
- }
-}
-
-
-/** pending message has completed */
-void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.erase(id);
-}
-
-
-/** done when an execution.sync arrives */
-void SessionState::AsyncCommandCompleter::flushPendingMessages()
-{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
- }
- // drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
- i != copy.end(); ++i) {
- i->second->flush();
- }
-}
-
-
-/** mark an ingress Message.Transfer command as completed.
- * This method must be thread safe - it may run on any thread.
+/** mark a command (sequence) as completed, return True if caller should
+ * schedule a call to completeCommands()
*/
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync)
+bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
- if (session && isAttached) {
- MessageInfo msg(cmd, requiresAccept, requiresSync);
- completedMsgs.push_back(msg);
- if (completedMsgs.size() == 1) {
- session->getConnection().requestIOProcessing(boost::bind(&schedule,
- session->asyncCommandCompleter));
- }
- }
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+
+ completedCmds.push_back(cmd);
+ return (completedCmds.size() == 1);
}
-/** Cause the session to complete all completed commands.
- * Executes on the IO thread.
- */
-void SessionState::AsyncCommandCompleter::completeCommands()
+/** Cause the session to complete all completed commands */
+void SessionState::ScheduledCompleterContext::completeCommands()
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
// when session is destroyed, it clears the session pointer via cancel().
- if (session && session->isAttached()) {
- for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
- msg != completedMsgs.end(); ++msg) {
- session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+ if (!session) return;
+
+ while (!completedCmds.empty()) {
+ SequenceNumber id = completedCmds.front();
+ completedCmds.pop_front();
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
+
+ cmd = session->incompleteCmds.find(id);
+ if (cmd !=session->incompleteCmds.end()) {
+ boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
+ {
+ qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
+ tmp->do_completion(); // retakes incompleteCmdslock
+ }
+ }
}
}
- completedMsgs.clear();
}
/** cancel any pending calls to scheduleComplete */
-void SessionState::AsyncCommandCompleter::cancel()
+void SessionState::ScheduledCompleterContext::cancel()
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
session = 0;
}
-
-/** inform the completer that the session has attached,
- * allows command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::attached()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- isAttached = true;
-}
-
-
-/** inform the completer that the session has detached,
- * disables command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::detached()
-{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- isAttached = false;
-}
-
}} // namespace qpid::broker