diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 3dcb0a62d4..b33181eee4 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -30,10 +30,11 @@ #include "qmf/org/apache/qpid/broker/Session.h" #include "qpid/broker/SessionAdapter.h" #include "qpid/broker/DeliveryAdapter.h" -#include "qpid/broker/IncompleteMessageList.h" +#include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SemanticState.h" +#include "qpid/sys/Monitor.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -122,11 +123,15 @@ class SessionState : public qpid::SessionState, const SessionId& getSessionId() const { return getId(); } + // Used by ExecutionHandler sync command processing. Notifies + // the SessionState of a received Execution.Sync command. + void addPendingExecutionSync(); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); - void enqueued(boost::intrusive_ptr<Message> msg); + void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg); void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); @@ -152,8 +157,6 @@ class SessionState : public qpid::SessionState, SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; - IncompleteMessageList incomplete; - IncompleteMessageList::CompletionListener enqueuedOp; qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; @@ -162,7 +165,28 @@ class SessionState : public qpid::SessionState, boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; boost::intrusive_ptr<sys::TimerTask> flowControlTimer; + // sequence numbers for pending received Execution.Sync commands + std::queue<SequenceNumber> pendingExecutionSyncs; + bool currentCommandComplete; + + class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler + { + public: + IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg) + : session(&_session), msg(_msg) {} + virtual void operator() (bool sync); + void cancel(); // cancel pending incomplete callback [operator() above]. + + private: + SessionState *session; + boost::intrusive_ptr<Message> msg; + static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg ); + }; + std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs; + qpid::sys::Mutex incompleteRcvMsgsLock; + friend class SessionManager; + friend class IncompleteRcvMsg; }; |