diff options
Diffstat (limited to 'cpp/src/qpid/SessionState.h')
-rw-r--r-- | cpp/src/qpid/SessionState.h | 163 |
1 files changed, 92 insertions, 71 deletions
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index b836534ee7..7957825dd3 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -22,9 +22,11 @@ * */ +#include <qpid/SessionId.h> #include <qpid/framing/SequenceNumber.h> #include <qpid/framing/SequenceSet.h> #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FrameHandler.h> #include <boost/operators.hpp> #include <vector> #include <iosfwd> @@ -49,118 +51,124 @@ struct SessionPoint : boost::totally_ordered1<SessionPoint> { std::ostream& operator<<(std::ostream&, const SessionPoint&); -/** The sending half of session state */ +/** + * Support for session idempotence barrier and resume as defined in + * AMQP 0-10. + * + * We only issue/use contiguous confirmations, out-of-order confirmation + * is ignored. Out of order completion is fully supported. + * + * Raises NotImplemented if the command point is set greater than the + * max currently received command data, either explicitly via + * session.command-point or implicitly via session.gap. + * + * Partial replay is not supported, replay always begins on a command + * boundary, and we never confirm partial commands. + * + * The SessionPoint data structure does store offsets so this class + * could be extended to support partial replay without + * source-incompatbile API changes. + */ +class SessionState { + public: + + /** State for commands sent. Records commands for replay, + * tracks confirmation and completion of sent commands. + */ class SendState { public: typedef std::vector<framing::AMQFrame> ReplayList; /** Record frame f for replay. Should not be called during replay. */ - void send(const framing::AMQFrame& f); + void record(const framing::AMQFrame& f); /** @return true if we should send flush for confirmed and completed commands. */ bool needFlush() const; /** Called when flush for confirmed and completed commands is sent to peer. */ - void sendFlush(); + void recordFlush(); - /** Called when the peer confirms up to commands. */ - void peerConfirmed(const SessionPoint& confirmed); + /** Called when the peer confirms up to comfirmed. */ + void confirmed(const SessionPoint& confirmed); /** Called when the peer indicates commands completed */ - void peerCompleted(const SequenceSet& commands); + void completed(const SequenceSet& commands); - /** Get the replay list. @see getReplayPoint. */ - const ReplayList& getReplayList() const { return replayList; } - - /** - * The replay point is the point up to which all data has been - * confirmed. Partial replay is not supported, it will always - * have offset==0. - */ + /** Point from which we can replay. All data < replayPoint is confirmed. */ const SessionPoint& getReplayPoint() const { return replayPoint; } - const SessionPoint& getSendPoint() const { return sendPoint; } - const SequenceSet& getCompleted() const { return sentCompleted; } + /** Get the replay list, starting from getReplayPoint() */ + // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&. + ReplayList& getReplayList() { return replayList; } - protected: - SendState(size_t replaySyncSize, size_t replayKillSize); + /** Point from which the next data will be sent. */ + const SessionPoint& getCommandPoint(); + + /** Set of outstanding incomplete commands */ + const SequenceSet& getIncomplete() const { return incomplete; } + + /** Peer expecting commands from this point. + *@return true if replay is required, sets replayPoint. + */ + bool expected(const SessionPoint& expected); private: - size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration. + SendState(SessionState& s); + + SessionState* session; // invariant: replayPoint <= flushPoint <= sendPoint SessionPoint replayPoint; // Can replay from this point - SessionPoint sendPoint; // Send from this point SessionPoint flushPoint; // Point of last flush + SessionPoint sendPoint; // Send from this point ReplayList replayList; // Starts from replayPoint. size_t unflushedSize; // Un-flushed bytes in replay list. - SequenceSet sentCompleted; // Commands sent and acknowledged as completed. + SequenceSet incomplete; // Commands sent and not yet completed. + + friend class SessionState; }; -/** Receiving half of SessionState */ + /** State for commands received. + * Idempotence barrier for duplicate commands, tracks completion + * and of received commands. + */ class ReceiveState { public: - bool hasState(); - /** Set the command point. */ - void setExpecting(const SessionPoint& point); + void setCommandPoint(const SessionPoint& point); /** Returns true if frame should be be processed, false if it is a duplicate. */ - bool receive(const framing::AMQFrame& f); + bool record(const framing::AMQFrame& f); /** Command completed locally */ - void localCompleted(SequenceNumber command); + void completed(SequenceNumber command, bool cumulative=false); /** Peer has indicated commands are known completed */ - void peerKnownComplete(const SequenceSet& commands); + void knownCompleted(const SequenceSet& commands); + + /** Get the incoming command point */ + const SessionPoint& getExpected() const { return expected; } - /** Recieved, completed and possibly not known by peer to be completed */ - const SequenceSet& getReceivedCompleted() const { return receivedCompleted; } - const SessionPoint& getExpecting() const { return expecting; } + /** Get the received high-water-mark, may be > getExpected() during replay */ const SessionPoint& getReceived() const { return received; } - protected: - ReceiveState(); + /** Completed commands that the peer may not know about */ + const SequenceSet& getUnknownComplete() const { return unknownCompleted; } + + /** ID of the command currently being handled. */ + SequenceNumber getCurrent() const; private: - bool stateful; // True if session has state. - SessionPoint expecting; // Expecting from here - SessionPoint received; // Received to here. Invariant: expecting <= received. - SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer -}; + ReceiveState(SessionState&); -/** Identifier for a session */ -class SessionId : boost::totally_ordered1<SessionId> { - std::string userId; - std::string name; - public: - SessionId(const std::string& userId=std::string(), const std::string& name=std::string()); - std::string getUserId() const { return userId; } - std::string getName() const { return name; } - bool operator<(const SessionId&) const ; - bool operator==(const SessionId& id) const; -}; + SessionState* session; + SessionPoint expected; // Expected from here + SessionPoint received; // Received to here. Invariant: expected <= received. + SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. + SequenceNumber firstIncomplete; // First incomplete command. + friend class SessionState; + }; -/** - * Support for session idempotence barrier and resume as defined in - * AMQP 0-10. - * - * We only issue/use contiguous confirmations, out-of-order confirmation - * is ignored. Out of order completion is fully supported. - * - * Raises NotImplemented if the command point is set greater than the - * max currently received command data, either explicitly via - * session.command-point or implicitly via session.gap. - * - * Partial replay is not supported, replay always begins on a command - * boundary, and we never confirm partial commands. - * - * The SessionPoint data structure does store offsets so this class - * could be extended to support partial replay without - * source-incompatbile API changes. - */ -class SessionState : public SendState, public ReceiveState { - public: struct Configuration { Configuration(); size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes @@ -169,19 +177,32 @@ class SessionState : public SendState, public ReceiveState { SessionState(const SessionId& =SessionId(), const Configuration& =Configuration()); + virtual ~SessionState(); + const SessionId& getId() const { return id; } uint32_t getTimeout() const { return timeout; } void setTimeout(uint32_t seconds) { timeout = seconds; } - /** Clear all state except Id. */ - void clear(); + bool operator==(const SessionId& other) const { return id == other; } + bool operator==(const SessionState& other) const { return id == other.id; } + + SendState sender; ///< State for commands sent + ReceiveState receiver; ///< State for commands received + + bool hasState() const; private: SessionId id; uint32_t timeout; Configuration config; + bool stateful; + + friend class SendState; + friend class ReceiveState; }; +inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } + } // namespace qpid |