summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.h')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h32
1 files changed, 28 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 3dcb0a62d4..b33181eee4 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -30,10 +30,11 @@
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/IncompleteMessageList.h"
+#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -122,11 +123,15 @@ class SessionState : public qpid::SessionState,
const SessionId& getSessionId() const { return getId(); }
+ // Used by ExecutionHandler sync command processing. Notifies
+ // the SessionState of a received Execution.Sync command.
+ void addPendingExecutionSync();
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
- void enqueued(boost::intrusive_ptr<Message> msg);
+ void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -152,8 +157,6 @@ class SessionState : public qpid::SessionState,
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
- IncompleteMessageList incomplete;
- IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
@@ -162,7 +165,28 @@ class SessionState : public qpid::SessionState,
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
+ // sequence numbers for pending received Execution.Sync commands
+ std::queue<SequenceNumber> pendingExecutionSyncs;
+ bool currentCommandComplete;
+
+ class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
+ {
+ public:
+ IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg)
+ : session(&_session), msg(_msg) {}
+ virtual void operator() (bool sync);
+ void cancel(); // cancel pending incomplete callback [operator() above].
+
+ private:
+ SessionState *session;
+ boost::intrusive_ptr<Message> msg;
+ static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg );
+ };
+ std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs;
+ qpid::sys::Mutex incompleteRcvMsgsLock;
+
friend class SessionManager;
+ friend class IncompleteRcvMsg;
};