summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.h')
-rw-r--r--cpp/src/qpid/broker/SessionState.h119
1 files changed, 4 insertions, 115 deletions
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 506af85c47..be79eb0eab 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -30,15 +30,13 @@
#include "qmf/org/apache/qpid/broker/Session.h"
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/IncompleteMessageList.h"
#include "qpid/broker/MessageBuilder.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SemanticState.h"
-#include "qpid/sys/Monitor.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/intrusive_ptr.hpp>
#include <set>
#include <vector>
@@ -125,10 +123,6 @@ class SessionState : public qpid::SessionState,
const SessionId& getSessionId() const { return getId(); }
- // Used by ExecutionHandler sync command processing. Notifies
- // the SessionState of a received Execution.Sync command.
- void addPendingExecutionSync();
-
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
@@ -136,10 +130,7 @@ class SessionState : public qpid::SessionState,
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-
- // indicate that the given ingress msg has been completely received by the
- // broker, and the msg's message.transfer command can be considered completed.
- void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
+ void enqueued(boost::intrusive_ptr<Message> msg);
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
@@ -165,6 +156,8 @@ class SessionState : public qpid::SessionState,
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
+ IncompleteMessageList incomplete;
+ IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
@@ -173,110 +166,6 @@ class SessionState : public qpid::SessionState,
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
- // sequence numbers for pending received Execution.Sync commands
- std::queue<SequenceNumber> pendingExecutionSyncs;
- bool currentCommandComplete;
-
- /** This class provides a context for completing asynchronous commands in a thread
- * safe manner. Asynchronous commands save their completion state in this class.
- * This class then schedules the completeCommands() method in the IO thread.
- * While running in the IO thread, completeCommands() may safely complete all
- * saved commands without the risk of colliding with other operations on this
- * SessionState.
- */
- class AsyncCommandCompleter : public RefCounted {
- private:
- SessionState *session;
- bool isAttached;
- qpid::sys::Mutex completerLock;
-
- // special-case message.transfer commands for optimization
- struct MessageInfo {
- SequenceNumber cmd; // message.transfer command id
- bool requiresAccept;
- bool requiresSync;
- MessageInfo(SequenceNumber c, bool a, bool s)
- : cmd(c), requiresAccept(a), requiresSync(s) {}
- };
- std::vector<MessageInfo> completedMsgs;
- // If an ingress message does not require a Sync, we need to
- // hold a reference to it in case an Execution.Sync command is received and we
- // have to manually flush the message.
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
-
- /** complete all pending commands, runs in IO thread */
- void completeCommands();
-
- /** for scheduling a run of "completeCommands()" on the IO thread */
- static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
-
- public:
- AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
- ~AsyncCommandCompleter() {};
-
- /** track a message pending ingress completion */
- void addPendingMessage(boost::intrusive_ptr<Message> m);
- void deletePendingMessage(SequenceNumber id);
- void flushPendingMessages();
- /** schedule the processing of a completed ingress message.transfer command */
- void scheduleMsgCompletion(SequenceNumber cmd,
- bool requiresAccept,
- bool requiresSync);
- void cancel(); // called by SessionState destructor.
- void attached(); // called by SessionState on attach()
- void detached(); // called by SessionState on detach()
- };
- boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
-
- /** Abstract class that represents a single asynchronous command that is
- * pending completion.
- */
- class AsyncCommandContext : public AsyncCompletion::Callback
- {
- public:
- AsyncCommandContext( SessionState *ss, SequenceNumber _id )
- : id(_id), completerContext(ss->asyncCommandCompleter) {}
- virtual ~AsyncCommandContext() {}
-
- protected:
- SequenceNumber id;
- boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
- };
-
- /** incomplete Message.transfer commands - inbound to broker from client
- */
- class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
- {
- public:
- IncompleteIngressMsgXfer( SessionState *ss,
- boost::intrusive_ptr<Message> m )
- : AsyncCommandContext(ss, m->getCommandId()),
- session(ss),
- msg(m),
- requiresAccept(m->requiresAccept()),
- requiresSync(m->getFrames().getMethod()->isSync()),
- pending(false) {}
- IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
- : AsyncCommandContext(x.session, x.msg->getCommandId()),
- session(x.session),
- msg(x.msg),
- requiresAccept(x.requiresAccept),
- requiresSync(x.requiresSync),
- pending(x.pending) {}
-
- virtual ~IncompleteIngressMsgXfer() {};
-
- virtual void completed(bool);
- virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
-
- private:
- SessionState *session; // only valid if sync flag in callback is true
- boost::intrusive_ptr<Message> msg;
- bool requiresAccept;
- bool requiresSync;
- bool pending; // true if msg saved on pending list...
- };
-
friend class SessionManager;
};