summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp168
1 files changed, 102 insertions, 66 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 957d5bd4d2..d84256b61b 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -62,7 +62,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- asyncCommandCompleter(new AsyncCommandCompleter(this))
+ asyncCommandManager(new AsyncCommandManager(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -95,7 +95,7 @@ void SessionState::addManagementObject() {
}
SessionState::~SessionState() {
- asyncCommandCompleter->cancel();
+ asyncCommandManager->cancel();
semanticState.closed();
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -126,7 +126,7 @@ bool SessionState::isLocal(const ConnectionToken* t) const
void SessionState::detach() {
QPID_LOG(debug, getId() << ": detached on broker.");
- asyncCommandCompleter->detached();
+ asyncCommandManager->detached();
disableOutput();
handler = 0;
if (mgmtObject != 0)
@@ -147,7 +147,7 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
- asyncCommandCompleter->attached();
+ asyncCommandManager->attached();
}
void SessionState::abort() {
@@ -204,22 +204,22 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
+void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id)
+{
currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks).
+ syncCurrentCommand = method->isSync();
+ acceptRequired = false;
Invoker::Result invocation = invoke(adapter, *method);
- if (currentCommandComplete) receiverCompleted(id);
-
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
- } else if (invocation.hasResult()) {
- getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync() && currentCommandComplete) {
- sendAcceptAndCompletion();
+ if (currentCommandComplete) {
+ completeCommand(id, invocation, false, syncCurrentCommand);
}
}
+
struct ScheduledCreditTask : public sys::TimerTask {
sys::Timer& timer;
SessionState& sessionState;
@@ -260,6 +260,9 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
}
msg->setPublisher(&getConnection());
msg->getIngressCompletion().begin();
+ currentCommandComplete = true; // assumed
+ syncCurrentCommand = msg->getFrames().getMethod()->isSync();
+ acceptRequired = msg->requiresAccept();
semanticState.handle(msg);
msgBuilder.end();
IncompleteIngressMsgXfer xfer(this, msg);
@@ -313,17 +316,19 @@ void SessionState::sendAcceptAndCompletion()
sendCompletion();
}
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc). At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
- */
-void SessionState::completeRcvMsg(SequenceNumber id,
- bool requiresAccept,
- bool requiresSync)
+/** Complete a received command */
+void SessionState::completeCommand(const SequenceNumber& id,
+ const framing::Invoker::Result& results,
+ bool requiresAccept,
+ bool syncBitSet)
{
bool callSendCompletion = false;
receiverCompleted(id);
+
+ if (results.hasResult()) {
+ getProxy().getExecution().result(id, results.getResult());
+ }
+
if (requiresAccept)
// will cause msg's seq to appear in the next message.accept we send.
accepted.add(id);
@@ -340,7 +345,7 @@ void SessionState::completeRcvMsg(SequenceNumber id,
}
// if the sender has requested immediate notification of the completion...
- if (requiresSync) {
+ if (syncBitSet) {
sendAcceptAndCompletion();
} else if (callSendCompletion) {
sendCompletion();
@@ -427,12 +432,25 @@ void SessionState::addPendingExecutionSync()
if (receiverGetIncomplete().front() < syncCommandId) {
currentCommandComplete = false;
pendingExecutionSyncs.push(syncCommandId);
- asyncCommandCompleter->flushPendingMessages();
+ asyncCommandManager->flushPendingCommands();
QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
}
}
+void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
+{
+ /** @todo KAG: ensure this is invoked during handleCommand() context! */
+ currentCommandComplete = false;
+ asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand );
+}
+
+
+void SessionState::cancelAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd)
+{
+ asyncCommandManager->cancelPendingCommand(aCmd);
+}
+
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
* which will be attached to a message that will be completed asynchronously.
*/
@@ -441,15 +459,14 @@ SessionState::IncompleteIngressMsgXfer::clone()
{
boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
- // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
- // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
- if (requiresSync)
+ // this routine is *only* invoked when the message needs to be asynchronously completed. Otherwise, ::completed()
+ // will be invoked directly.
+ pending = true;
+ boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb));
+ session->registerAsyncCommand(ctxt);
+ if (ctxt->getSyncBitSet()) {
+ // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
msg->flush();
- else {
- // otherwise, we need to track this message in order to flush it if an execution.sync arrives
- // before it has been completed (see flushPendingMessages())
- pending = true;
- completerContext->addPendingMessage(msg);
}
return cb;
}
@@ -461,110 +478,129 @@ SessionState::IncompleteIngressMsgXfer::clone()
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
- if (pending) completerContext->deletePendingMessage(id);
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
* the scheduledCompleterContext, since *this has a shared pointer to it.
* but not session!
*/
session = 0;
- QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+ QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId());
+ completed(framing::Invoker::Result());
} else {
// this path runs directly from the ac->end() call in handleContent() above,
// so *session is definately valid.
if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << id);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
+ QPID_LOG(debug, ": receive completed for msg seq=" << getId());
+ session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet());
+ }
+ if (pending) {
+ boost::intrusive_ptr<AsyncCommandContext> p(this);
+ session->cancelAsyncCommand(p);
}
}
- completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
+}
+
+
+void SessionState::IncompleteIngressMsgXfer::flush()
+{
+ msg->flush();
}
/** Scheduled from an asynchronous command's completed callback to run on
* the IO thread.
*/
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
+void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncCommandManager> ctxt)
{
- ctxt->completeCommands();
+ ctxt->processCompletedCommands();
}
-/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+ framing::SequenceNumber seq,
+ bool acceptRequired, bool syncBitSet)
{
+ cmd->setId(seq);
+ cmd->setRequiresAccept(acceptRequired);
+ cmd->setSyncBitSet(syncBitSet);
+ cmd->setManager(this);
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
- bool unique = pendingMsgs.insert(item).second;
- assert(unique);
+ std::pair<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > item(cmd->getId(), cmd);
+ bool unique = pendingCommands.insert(item).second;
+ if (!unique) assert(false);
}
-/** pending message has completed */
-void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
+void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.erase(id);
+ pendingCommands.erase(cmd->getId());
+ cmd->setManager(0);
}
+
/** done when an execution.sync arrives */
-void SessionState::AsyncCommandCompleter::flushPendingMessages()
+void SessionState::AsyncCommandManager::flushPendingCommands()
{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> > copy;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
+ copy = pendingCommands;
}
// drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ for (std::map<SequenceNumber, boost::intrusive_ptr<AsyncCommandContext> >::iterator i = copy.begin();
i != copy.end(); ++i) {
i->second->flush();
}
}
-/** mark an ingress Message.Transfer command as completed.
+/** mark a pending command as completed.
* This method must be thread safe - it may run on any thread.
*/
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync)
+void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd,
+ const framing::Invoker::Result& result)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
if (session && isAttached) {
- MessageInfo msg(cmd, requiresAccept, requiresSync);
- completedMsgs.push_back(msg);
- if (completedMsgs.size() == 1) {
+ CommandInfo status(cmd->getId(),
+ result,
+ cmd->getRequiresAccept(),
+ cmd->getSyncBitSet());
+ completedCommands.push_back(status);
+ if (completedCommands.size() == 1) {
session->getConnection().requestIOProcessing(boost::bind(&schedule,
- session->asyncCommandCompleter));
+ session->asyncCommandManager));
}
}
+ pendingCommands.erase(cmd->getId());
}
/** Cause the session to complete all completed commands.
* Executes on the IO thread.
*/
-void SessionState::AsyncCommandCompleter::completeCommands()
+void SessionState::AsyncCommandManager::processCompletedCommands()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
// when session is destroyed, it clears the session pointer via cancel().
if (session && session->isAttached()) {
- for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
- msg != completedMsgs.end(); ++msg) {
- session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
+ for (std::vector<CommandInfo>::iterator cmd = completedCommands.begin();
+ cmd != completedCommands.end(); ++cmd) {
+ session->completeCommand(cmd->id,
+ cmd->results,
+ cmd->requiresAccept,
+ cmd->syncBitSet);
}
}
- completedMsgs.clear();
+ completedCommands.clear();
}
/** cancel any pending calls to scheduleComplete */
-void SessionState::AsyncCommandCompleter::cancel()
+void SessionState::AsyncCommandManager::cancel()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
session = 0;
@@ -573,7 +609,7 @@ void SessionState::AsyncCommandCompleter::cancel()
/** inform the completer that the session has attached,
* allows command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::attached()
+void SessionState::AsyncCommandManager::attached()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
isAttached = true;
@@ -582,7 +618,7 @@ void SessionState::AsyncCommandCompleter::attached()
/** inform the completer that the session has detached,
* disables command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::detached()
+void SessionState::AsyncCommandManager::detached()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
isAttached = false;