summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/SessionState.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/SessionState.h')
-rw-r--r--cpp/src/qpid/SessionState.h163
1 files changed, 92 insertions, 71 deletions
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index b836534ee7..7957825dd3 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -22,9 +22,11 @@
*
*/
+#include <qpid/SessionId.h>
#include <qpid/framing/SequenceNumber.h>
#include <qpid/framing/SequenceSet.h>
#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/FrameHandler.h>
#include <boost/operators.hpp>
#include <vector>
#include <iosfwd>
@@ -49,118 +51,124 @@ struct SessionPoint : boost::totally_ordered1<SessionPoint> {
std::ostream& operator<<(std::ostream&, const SessionPoint&);
-/** The sending half of session state */
+/**
+ * Support for session idempotence barrier and resume as defined in
+ * AMQP 0-10.
+ *
+ * We only issue/use contiguous confirmations, out-of-order confirmation
+ * is ignored. Out of order completion is fully supported.
+ *
+ * Raises NotImplemented if the command point is set greater than the
+ * max currently received command data, either explicitly via
+ * session.command-point or implicitly via session.gap.
+ *
+ * Partial replay is not supported, replay always begins on a command
+ * boundary, and we never confirm partial commands.
+ *
+ * The SessionPoint data structure does store offsets so this class
+ * could be extended to support partial replay without
+ * source-incompatbile API changes.
+ */
+class SessionState {
+ public:
+
+ /** State for commands sent. Records commands for replay,
+ * tracks confirmation and completion of sent commands.
+ */
class SendState {
public:
typedef std::vector<framing::AMQFrame> ReplayList;
/** Record frame f for replay. Should not be called during replay. */
- void send(const framing::AMQFrame& f);
+ void record(const framing::AMQFrame& f);
/** @return true if we should send flush for confirmed and completed commands. */
bool needFlush() const;
/** Called when flush for confirmed and completed commands is sent to peer. */
- void sendFlush();
+ void recordFlush();
- /** Called when the peer confirms up to commands. */
- void peerConfirmed(const SessionPoint& confirmed);
+ /** Called when the peer confirms up to comfirmed. */
+ void confirmed(const SessionPoint& confirmed);
/** Called when the peer indicates commands completed */
- void peerCompleted(const SequenceSet& commands);
+ void completed(const SequenceSet& commands);
- /** Get the replay list. @see getReplayPoint. */
- const ReplayList& getReplayList() const { return replayList; }
-
- /**
- * The replay point is the point up to which all data has been
- * confirmed. Partial replay is not supported, it will always
- * have offset==0.
- */
+ /** Point from which we can replay. All data < replayPoint is confirmed. */
const SessionPoint& getReplayPoint() const { return replayPoint; }
- const SessionPoint& getSendPoint() const { return sendPoint; }
- const SequenceSet& getCompleted() const { return sentCompleted; }
+ /** Get the replay list, starting from getReplayPoint() */
+ // TODO aconway 2008-04-30: should be const, but FrameHandler takes non-const AMQFrame&.
+ ReplayList& getReplayList() { return replayList; }
- protected:
- SendState(size_t replaySyncSize, size_t replayKillSize);
+ /** Point from which the next data will be sent. */
+ const SessionPoint& getCommandPoint();
+
+ /** Set of outstanding incomplete commands */
+ const SequenceSet& getIncomplete() const { return incomplete; }
+
+ /** Peer expecting commands from this point.
+ *@return true if replay is required, sets replayPoint.
+ */
+ bool expected(const SessionPoint& expected);
private:
- size_t replaySyncSize, replayKillSize; // @see SessionState::Configuration.
+ SendState(SessionState& s);
+
+ SessionState* session;
// invariant: replayPoint <= flushPoint <= sendPoint
SessionPoint replayPoint; // Can replay from this point
- SessionPoint sendPoint; // Send from this point
SessionPoint flushPoint; // Point of last flush
+ SessionPoint sendPoint; // Send from this point
ReplayList replayList; // Starts from replayPoint.
size_t unflushedSize; // Un-flushed bytes in replay list.
- SequenceSet sentCompleted; // Commands sent and acknowledged as completed.
+ SequenceSet incomplete; // Commands sent and not yet completed.
+
+ friend class SessionState;
};
-/** Receiving half of SessionState */
+ /** State for commands received.
+ * Idempotence barrier for duplicate commands, tracks completion
+ * and of received commands.
+ */
class ReceiveState {
public:
- bool hasState();
-
/** Set the command point. */
- void setExpecting(const SessionPoint& point);
+ void setCommandPoint(const SessionPoint& point);
/** Returns true if frame should be be processed, false if it is a duplicate. */
- bool receive(const framing::AMQFrame& f);
+ bool record(const framing::AMQFrame& f);
/** Command completed locally */
- void localCompleted(SequenceNumber command);
+ void completed(SequenceNumber command, bool cumulative=false);
/** Peer has indicated commands are known completed */
- void peerKnownComplete(const SequenceSet& commands);
+ void knownCompleted(const SequenceSet& commands);
+
+ /** Get the incoming command point */
+ const SessionPoint& getExpected() const { return expected; }
- /** Recieved, completed and possibly not known by peer to be completed */
- const SequenceSet& getReceivedCompleted() const { return receivedCompleted; }
- const SessionPoint& getExpecting() const { return expecting; }
+ /** Get the received high-water-mark, may be > getExpected() during replay */
const SessionPoint& getReceived() const { return received; }
- protected:
- ReceiveState();
+ /** Completed commands that the peer may not know about */
+ const SequenceSet& getUnknownComplete() const { return unknownCompleted; }
+
+ /** ID of the command currently being handled. */
+ SequenceNumber getCurrent() const;
private:
- bool stateful; // True if session has state.
- SessionPoint expecting; // Expecting from here
- SessionPoint received; // Received to here. Invariant: expecting <= received.
- SequenceSet receivedCompleted; // Received & completed, may not be not known-completed by peer
-};
+ ReceiveState(SessionState&);
-/** Identifier for a session */
-class SessionId : boost::totally_ordered1<SessionId> {
- std::string userId;
- std::string name;
- public:
- SessionId(const std::string& userId=std::string(), const std::string& name=std::string());
- std::string getUserId() const { return userId; }
- std::string getName() const { return name; }
- bool operator<(const SessionId&) const ;
- bool operator==(const SessionId& id) const;
-};
+ SessionState* session;
+ SessionPoint expected; // Expected from here
+ SessionPoint received; // Received to here. Invariant: expected <= received.
+ SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
+ SequenceNumber firstIncomplete; // First incomplete command.
+ friend class SessionState;
+ };
-/**
- * Support for session idempotence barrier and resume as defined in
- * AMQP 0-10.
- *
- * We only issue/use contiguous confirmations, out-of-order confirmation
- * is ignored. Out of order completion is fully supported.
- *
- * Raises NotImplemented if the command point is set greater than the
- * max currently received command data, either explicitly via
- * session.command-point or implicitly via session.gap.
- *
- * Partial replay is not supported, replay always begins on a command
- * boundary, and we never confirm partial commands.
- *
- * The SessionPoint data structure does store offsets so this class
- * could be extended to support partial replay without
- * source-incompatbile API changes.
- */
-class SessionState : public SendState, public ReceiveState {
- public:
struct Configuration {
Configuration();
size_t replaySyncSize; // Issue a sync when the replay list holds >= N bytes
@@ -169,19 +177,32 @@ class SessionState : public SendState, public ReceiveState {
SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
+ virtual ~SessionState();
+
const SessionId& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
void setTimeout(uint32_t seconds) { timeout = seconds; }
- /** Clear all state except Id. */
- void clear();
+ bool operator==(const SessionId& other) const { return id == other; }
+ bool operator==(const SessionState& other) const { return id == other.id; }
+
+ SendState sender; ///< State for commands sent
+ ReceiveState receiver; ///< State for commands received
+
+ bool hasState() const;
private:
SessionId id;
uint32_t timeout;
Configuration config;
+ bool stateful;
+
+ friend class SendState;
+ friend class ReceiveState;
};
+inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
+
} // namespace qpid