summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-31 16:56:57 +0000
committerAlan Conway <aconway@apache.org>2007-10-31 16:56:57 +0000
commite7b39d571cf5a805af19c9286ba1a404534b8c93 (patch)
tree65adede8e68527b7e8aa8b510f6e275d49345ef5 /qpid/cpp/src
parente7e29f238b9f1678fdf98c53e9d98b6dd77b3128 (diff)
downloadqpid-python-e7b39d571cf5a805af19c9286ba1a404534b8c93.tar.gz
Simplify SessionState, preparing for session thread safety fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@590751 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionManager.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/SessionCore.cpp4
-rw-r--r--qpid/cpp/src/qpid/framing/SessionState.cpp21
-rw-r--r--qpid/cpp/src/qpid/framing/SessionState.h25
-rw-r--r--qpid/cpp/src/tests/SessionState.cpp5
7 files changed, 15 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 49492ffed1..1d0e67f6ab 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -38,7 +38,8 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch)
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false) {}
+ ignoring(false),
+ resuming(false) {}
SessionHandler::~SessionHandler() {}
@@ -114,7 +115,8 @@ void SessionHandler::resume(const Uuid& id) {
assertClosed("resume");
session = connection.broker.getSessionManager().resume(id);
session->attach(*this);
- SequenceNumber seq = session->resuming();
+ resuming=true;
+ SequenceNumber seq = session->sendingAck();
peerSession.attached(session->getId(), session->getTimeout());
proxy.getSession().ack(seq, SequenceNumberSet());
}
@@ -148,7 +150,7 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
}
void SessionHandler::localSuspend() {
- if (session.get() && session->getState() == SessionState::ATTACHED) {
+ if (session.get()) {
session->detach();
connection.broker.getSessionManager().suspend(session);
}
@@ -166,7 +168,8 @@ void SessionHandler::ack(uint32_t cumulativeSeenMark,
const SequenceNumberSet& /*seenFrameSet*/)
{
assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
+ if (resuming) {
+ resuming=false;
session->receivedAck(cumulativeSeenMark);
framing::SessionState::Replay replay=session->replay();
std::for_each(replay.begin(), replay.end(),
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 9a68ddb46f..52f64779d4 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -92,6 +92,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
+ bool resuming;
std::auto_ptr<SessionState> session;
};
diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp
index f12ebc6db1..5bdc572491 100644
--- a/qpid/cpp/src/qpid/broker/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp
@@ -56,7 +56,6 @@ std::auto_ptr<SessionState> SessionManager::open(
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
active.erase(session->getId());
- session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
suspended.push_back(session.release()); // In expiry order
eraseExpired();
diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp
index f7f0f52dba..30df574716 100644
--- a/qpid/cpp/src/qpid/client/SessionCore.cpp
+++ b/qpid/cpp/src/qpid/client/SessionCore.cpp
@@ -52,7 +52,6 @@ inline void SessionCore::invariant() const {
break;
case RESUMING:
assert(session);
- assert(session->getState() == SessionState::RESUMING);
assert(code==REPLY_SUCCESS);
assert(connection);
assert(channel.get());
@@ -143,7 +142,6 @@ void SessionCore::doSuspend(int code, const std::string& text) {
if (state != CLOSED) {
invariant();
detach(code, text);
- session->suspend();
setState(SUSPENDED);
}
}
@@ -202,7 +200,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
if (state==OPEN)
doSuspend(REPLY_SUCCESS, OK);
check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed."));
- SequenceNumber sendAck=session->resuming();
+ SequenceNumber sendAck=session->sendingAck();
attaching(c);
proxy.resume(getId());
waitFor(OPEN);
diff --git a/qpid/cpp/src/qpid/framing/SessionState.cpp b/qpid/cpp/src/qpid/framing/SessionState.cpp
index 8056f4a523..7e905bdf63 100644
--- a/qpid/cpp/src/qpid/framing/SessionState.cpp
+++ b/qpid/cpp/src/qpid/framing/SessionState.cpp
@@ -33,7 +33,6 @@ namespace qpid {
namespace framing {
SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
- state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -45,7 +44,6 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
{}
SessionState::SessionState(const Uuid& uuid) :
- state(ATTACHED),
id(uuid),
lastReceived(-1),
lastSent(-1),
@@ -65,10 +63,6 @@ bool isSessionCommand(const AMQFrame& f) {
boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
if (isSessionCommand(f))
return boost::none;
- if (state==RESUMING)
- throw CommandInvalidException(
- QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
- assert(state = ATTACHED);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
if (ackInterval && lastReceived == sendAckAt)
@@ -85,7 +79,6 @@ bool SessionState::sent(const AMQFrame& f) {
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
return ackInterval &&
- (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -97,8 +90,6 @@ SessionState::Replay SessionState::replay() {
}
void SessionState::receivedAck(SequenceNumber acked) {
- if (state==RESUMING) state=ATTACHED;
- assert(state==ATTACHED);
if (lastSent < acked)
throw InvalidArgumentException("Invalid sequence number in ack");
size_t keep = lastSent - acked;
@@ -113,22 +104,10 @@ SequenceNumber SessionState::sendingAck() {
}
bool SessionState::sendingSolicit() {
- assert(state == ATTACHED);
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
return ackInterval != 0;
}
-SequenceNumber SessionState::resuming() {
- if (!resumable)
- throw InternalErrorException("Session is not resumable");
- state = RESUMING;
- return sendingAck();
-}
-
-void SessionState::suspend() {
- state = SUSPENDED;
-}
-
}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/SessionState.h b/qpid/cpp/src/qpid/framing/SessionState.h
index 361c960db1..5034de4e94 100644
--- a/qpid/cpp/src/qpid/framing/SessionState.h
+++ b/qpid/cpp/src/qpid/framing/SessionState.h
@@ -35,7 +35,10 @@ namespace framing {
/**
* Session state common to client and broker.
- * Stores replay frames, implements session ack/resume protcools.
+ *
+ * Stores data needed to resume a session: replay frames, implements
+ * session ack/resume protcools. Stores handler chains for the session,
+ * handlers may themselves store state.
*
* A SessionState is always associated with an _open_ session (attached or
* suspended) it is destroyed when the session is closed.
@@ -46,13 +49,6 @@ class SessionState
public:
typedef std::vector<AMQFrame> Replay;
- /** States of a session. */
- enum State {
- SUSPENDED, ///< Suspended, detached from any channel.
- RESUMING, ///< Resuming: waiting for initial ack from peer.
- ATTACHED ///< Attached to channel and operating normally.
- };
-
/**
*Create a newly opened active session.
*@param ackInterval send/solicit an ack whenever N unacked frames
@@ -60,7 +56,8 @@ class SessionState
*
* N=0 disables voluntary send/solict ack.
*/
- SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true));
+ SessionState(uint32_t ackInterval,
+ const framing::Uuid& id=framing::Uuid(true));
/**
* Create a non-resumable session. Does not store session frames,
@@ -69,7 +66,6 @@ class SessionState
SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
- State getState() const { return state; }
/** Received incoming L3 frame.
* @return SequenceNumber if an ack should be sent, empty otherwise.
@@ -92,13 +88,6 @@ class SessionState
*/
Replay replay();
- /** Suspend the session. */
- void suspend();
-
- /** Start resume protocol for the session.
- *@returns sequence number to ack immediately. */
- SequenceNumber resuming();
-
/** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
*
* Note: when received() returns a sequence number this function
@@ -115,9 +104,7 @@ class SessionState
bool sendingSolicit();
- State state;
framing::Uuid id;
-
Unacked unackedOut;
SequenceNumber lastReceived;
SequenceNumber lastSent;
diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp
index c8d912801e..19a146c759 100644
--- a/qpid/cpp/src/tests/SessionState.cpp
+++ b/qpid/cpp/src/tests/SessionState.cpp
@@ -97,21 +97,16 @@ BOOST_AUTO_TEST_CASE(testReplay) {
// Replay of all frames.
SessionState session(100);
sent(session, "abc");
- session.suspend(); session.resuming();
session.receivedAck(-1);
BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc");
// Replay with acks
session.receivedAck(0); // ack a.
- session.suspend();
- session.resuming();
session.receivedAck(1); // ack b.
BOOST_CHECK_EQUAL(replayChars(session.replay()), "c");
// Replay after further frames.
sent(session, "def");
- session.suspend();
- session.resuming();
session.receivedAck(3);
BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef");