summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/framing/SessionState.cpp')
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp30
1 files changed, 22 insertions, 8 deletions
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
index 045a0ae115..8056f4a523 100644
--- a/cpp/src/qpid/framing/SessionState.cpp
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -40,11 +40,22 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
ackInterval(ack),
sendAckAt(lastReceived+ackInterval),
solicitAckAt(lastSent+ackInterval),
- ackSolicited(false)
+ ackSolicited(false),
+ resumable(true)
+{}
+
+SessionState::SessionState(const Uuid& uuid) :
+ state(ATTACHED),
+ id(uuid),
+ lastReceived(-1),
+ lastSent(-1),
+ ackInterval(0),
+ sendAckAt(0),
+ solicitAckAt(0),
+ ackSolicited(false),
+ resumable(false)
{
- assert(ackInterval > 0);
}
-
namespace {
bool isSessionCommand(const AMQFrame& f) {
return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
@@ -58,10 +69,9 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
throw CommandInvalidException(
QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
assert(state = ATTACHED);
- assert(lastReceived<sendAckAt);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
- if (lastReceived == sendAckAt)
+ if (ackInterval && lastReceived == sendAckAt)
return sendingAck();
else
return boost::none;
@@ -70,10 +80,12 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
bool SessionState::sent(const AMQFrame& f) {
if (isSessionCommand(f))
return false;
- unackedOut.push_back(f);
+ if (resumable)
+ unackedOut.push_back(f);
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
- return (state!=RESUMING) &&
+ return ackInterval &&
+ (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -105,10 +117,12 @@ bool SessionState::sendingSolicit() {
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
- return true;
+ return ackInterval != 0;
}
SequenceNumber SessionState::resuming() {
+ if (!resumable)
+ throw InternalErrorException("Session is not resumable");
state = RESUMING;
return sendingAck();
}