summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.h')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h135
1 files changed, 54 insertions, 81 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 506af85c47..5e162e6475 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -38,7 +38,6 @@
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/intrusive_ptr.hpp>
#include <set>
#include <vector>
@@ -177,105 +176,79 @@ class SessionState : public qpid::SessionState,
std::queue<SequenceNumber> pendingExecutionSyncs;
bool currentCommandComplete;
- /** This class provides a context for completing asynchronous commands in a thread
- * safe manner. Asynchronous commands save their completion state in this class.
- * This class then schedules the completeCommands() method in the IO thread.
- * While running in the IO thread, completeCommands() may safely complete all
- * saved commands without the risk of colliding with other operations on this
- * SessionState.
+ /** Abstract class that represents a command that is pending
+ * completion.
*/
- class AsyncCommandCompleter : public RefCounted {
- private:
- SessionState *session;
- bool isAttached;
- qpid::sys::Mutex completerLock;
-
- // special-case message.transfer commands for optimization
- struct MessageInfo {
- SequenceNumber cmd; // message.transfer command id
- bool requiresAccept;
- bool requiresSync;
- MessageInfo(SequenceNumber c, bool a, bool s)
- : cmd(c), requiresAccept(a), requiresSync(s) {}
- };
- std::vector<MessageInfo> completedMsgs;
- // If an ingress message does not require a Sync, we need to
- // hold a reference to it in case an Execution.Sync command is received and we
- // have to manually flush the message.
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
-
- /** complete all pending commands, runs in IO thread */
- void completeCommands();
-
- /** for scheduling a run of "completeCommands()" on the IO thread */
- static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
-
- public:
- AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
- ~AsyncCommandCompleter() {};
-
- /** track a message pending ingress completion */
- void addPendingMessage(boost::intrusive_ptr<Message> m);
- void deletePendingMessage(SequenceNumber id);
- void flushPendingMessages();
- /** schedule the processing of a completed ingress message.transfer command */
- void scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync);
- void cancel(); // called by SessionState destructor.
- void attached(); // called by SessionState on attach()
- void detached(); // called by SessionState on detach()
- };
- boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
-
- /** Abstract class that represents a single asynchronous command that is
- * pending completion.
- */
- class AsyncCommandContext : public AsyncCompletion::Callback
+ class IncompleteCommandContext : public AsyncCompletion
{
public:
- AsyncCommandContext( SessionState *ss, SequenceNumber _id )
- : id(_id), completerContext(ss->asyncCommandCompleter) {}
- virtual ~AsyncCommandContext() {}
+ IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
+ : id(_id), session(ss) {}
+ virtual ~IncompleteCommandContext() {}
+
+ /* allows manual invokation of completion, used by IO thread to
+ * complete a command that was originally finished on a different
+ * thread.
+ */
+ void do_completion() { completed(true); }
protected:
SequenceNumber id;
- boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
+ SessionState *session;
};
/** incomplete Message.transfer commands - inbound to broker from client
*/
- class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
+ class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
- boost::intrusive_ptr<Message> m )
- : AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
- msg(m),
- requiresAccept(m->requiresAccept()),
- requiresSync(m->getFrames().getMethod()->isSync()),
- pending(false) {}
- IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
- : AsyncCommandContext(x.session, x.msg->getCommandId()),
- session(x.session),
- msg(x.msg),
- requiresAccept(x.requiresAccept),
- requiresSync(x.requiresSync),
- pending(x.pending) {}
-
- virtual ~IncompleteIngressMsgXfer() {};
+ SequenceNumber _id,
+ boost::intrusive_ptr<Message> msg )
+ : IncompleteCommandContext(ss, _id),
+ requiresAccept(msg->requiresAccept()),
+ requiresSync(msg->getFrames().getMethod()->isSync()) {};
+ virtual ~IncompleteIngressMsgXfer() {};
+ protected:
virtual void completed(bool);
- virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
- SessionState *session; // only valid if sync flag in callback is true
- boost::intrusive_ptr<Message> msg;
+ /** meta-info required to complete the message */
bool requiresAccept;
- bool requiresSync;
- bool pending; // true if msg saved on pending list...
+ bool requiresSync; // method's isSync() flag
};
+ /** creates a command context suitable for use as an AsyncCompletion in a message */
+ boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
+
+ /* A list of commands that are pending completion. These commands are
+ * awaiting some set of asynchronous operations to finish (eg: store,
+ * flow-control, etc). before the command can be completed to the client
+ */
+ std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
+ qpid::sys::Mutex incompleteCmdsLock; // locks above container
+
+ /** This context is shared between the SessionState and scheduledCompleter,
+ * holds the sequence numbers of all commands that have completed asynchronously.
+ */
+ class ScheduledCompleterContext {
+ private:
+ std::list<SequenceNumber> completedCmds;
+ // ordering: take this lock first, then incompleteCmdsLock
+ qpid::sys::Mutex completedCmdsLock;
+ SessionState *session;
+ public:
+ ScheduledCompleterContext(SessionState *s) : session(s) {};
+ bool scheduleCompletion(SequenceNumber cmd);
+ void completeCommands();
+ void cancel();
+ };
+ boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
+
+ /** The following method runs the in IO thread and completes commands that
+ * where finished asynchronously.
+ */
+ static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
friend class SessionManager;
};