diff options
author | Alan Conway <aconway@apache.org> | 2007-10-31 16:56:57 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-10-31 16:56:57 +0000 |
commit | e7b39d571cf5a805af19c9286ba1a404534b8c93 (patch) | |
tree | 65adede8e68527b7e8aa8b510f6e275d49345ef5 /qpid/cpp/src | |
parent | e7e29f238b9f1678fdf98c53e9d98b6dd77b3128 (diff) | |
download | qpid-python-e7b39d571cf5a805af19c9286ba1a404534b8c93.tar.gz |
Simplify SessionState, preparing for session thread safety fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@590751 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionManager.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionCore.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/SessionState.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/SessionState.h | 25 | ||||
-rw-r--r-- | qpid/cpp/src/tests/SessionState.cpp | 5 |
7 files changed, 15 insertions, 53 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 49492ffed1..1d0e67f6ab 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -38,7 +38,8 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) {} + ignoring(false), + resuming(false) {} SessionHandler::~SessionHandler() {} @@ -114,7 +115,8 @@ void SessionHandler::resume(const Uuid& id) { assertClosed("resume"); session = connection.broker.getSessionManager().resume(id); session->attach(*this); - SequenceNumber seq = session->resuming(); + resuming=true; + SequenceNumber seq = session->sendingAck(); peerSession.attached(session->getId(), session->getTimeout()); proxy.getSession().ack(seq, SequenceNumberSet()); } @@ -148,7 +150,7 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) { } void SessionHandler::localSuspend() { - if (session.get() && session->getState() == SessionState::ATTACHED) { + if (session.get()) { session->detach(); connection.broker.getSessionManager().suspend(session); } @@ -166,7 +168,8 @@ void SessionHandler::ack(uint32_t cumulativeSeenMark, const SequenceNumberSet& /*seenFrameSet*/) { assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { + if (resuming) { + resuming=false; session->receivedAck(cumulativeSeenMark); framing::SessionState::Replay replay=session->replay(); std::for_each(replay.begin(), replay.end(), diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 9a68ddb46f..52f64779d4 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -92,6 +92,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session peerSession; bool ignoring; + bool resuming; std::auto_ptr<SessionState> session; }; diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index f12ebc6db1..5bdc572491 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -56,7 +56,6 @@ std::auto_ptr<SessionState> SessionManager::open( void SessionManager::suspend(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); active.erase(session->getId()); - session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); suspended.push_back(session.release()); // In expiry order eraseExpired(); diff --git a/qpid/cpp/src/qpid/client/SessionCore.cpp b/qpid/cpp/src/qpid/client/SessionCore.cpp index f7f0f52dba..30df574716 100644 --- a/qpid/cpp/src/qpid/client/SessionCore.cpp +++ b/qpid/cpp/src/qpid/client/SessionCore.cpp @@ -52,7 +52,6 @@ inline void SessionCore::invariant() const { break; case RESUMING: assert(session); - assert(session->getState() == SessionState::RESUMING); assert(code==REPLY_SUCCESS); assert(connection); assert(channel.get()); @@ -143,7 +142,6 @@ void SessionCore::doSuspend(int code, const std::string& text) { if (state != CLOSED) { invariant(); detach(code, text); - session->suspend(); setState(SUSPENDED); } } @@ -202,7 +200,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) { if (state==OPEN) doSuspend(REPLY_SUCCESS, OK); check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed.")); - SequenceNumber sendAck=session->resuming(); + SequenceNumber sendAck=session->sendingAck(); attaching(c); proxy.resume(getId()); waitFor(OPEN); diff --git a/qpid/cpp/src/qpid/framing/SessionState.cpp b/qpid/cpp/src/qpid/framing/SessionState.cpp index 8056f4a523..7e905bdf63 100644 --- a/qpid/cpp/src/qpid/framing/SessionState.cpp +++ b/qpid/cpp/src/qpid/framing/SessionState.cpp @@ -33,7 +33,6 @@ namespace qpid { namespace framing { SessionState::SessionState(uint32_t ack, const Uuid& uuid) : - state(ATTACHED), id(uuid), lastReceived(-1), lastSent(-1), @@ -45,7 +44,6 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) : {} SessionState::SessionState(const Uuid& uuid) : - state(ATTACHED), id(uuid), lastReceived(-1), lastSent(-1), @@ -65,10 +63,6 @@ bool isSessionCommand(const AMQFrame& f) { boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { if (isSessionCommand(f)) return boost::none; - if (state==RESUMING) - throw CommandInvalidException( - QPID_MSG("Invalid frame: Resuming session, expected session-ack")); - assert(state = ATTACHED); ++lastReceived; QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); if (ackInterval && lastReceived == sendAckAt) @@ -85,7 +79,6 @@ bool SessionState::sent(const AMQFrame& f) { ++lastSent; QPID_LOG(trace, "Sent # "<< lastSent << " " << id); return ackInterval && - (state!=RESUMING) && (lastSent == solicitAckAt) && sendingSolicit(); } @@ -97,8 +90,6 @@ SessionState::Replay SessionState::replay() { } void SessionState::receivedAck(SequenceNumber acked) { - if (state==RESUMING) state=ATTACHED; - assert(state==ATTACHED); if (lastSent < acked) throw InvalidArgumentException("Invalid sequence number in ack"); size_t keep = lastSent - acked; @@ -113,22 +104,10 @@ SequenceNumber SessionState::sendingAck() { } bool SessionState::sendingSolicit() { - assert(state == ATTACHED); if (ackSolicited) return false; solicitAckAt = lastSent + ackInterval; return ackInterval != 0; } -SequenceNumber SessionState::resuming() { - if (!resumable) - throw InternalErrorException("Session is not resumable"); - state = RESUMING; - return sendingAck(); -} - -void SessionState::suspend() { - state = SUSPENDED; -} - }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/SessionState.h b/qpid/cpp/src/qpid/framing/SessionState.h index 361c960db1..5034de4e94 100644 --- a/qpid/cpp/src/qpid/framing/SessionState.h +++ b/qpid/cpp/src/qpid/framing/SessionState.h @@ -35,7 +35,10 @@ namespace framing { /** * Session state common to client and broker. - * Stores replay frames, implements session ack/resume protcools. + * + * Stores data needed to resume a session: replay frames, implements + * session ack/resume protcools. Stores handler chains for the session, + * handlers may themselves store state. * * A SessionState is always associated with an _open_ session (attached or * suspended) it is destroyed when the session is closed. @@ -46,13 +49,6 @@ class SessionState public: typedef std::vector<AMQFrame> Replay; - /** States of a session. */ - enum State { - SUSPENDED, ///< Suspended, detached from any channel. - RESUMING, ///< Resuming: waiting for initial ack from peer. - ATTACHED ///< Attached to channel and operating normally. - }; - /** *Create a newly opened active session. *@param ackInterval send/solicit an ack whenever N unacked frames @@ -60,7 +56,8 @@ class SessionState * * N=0 disables voluntary send/solict ack. */ - SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true)); + SessionState(uint32_t ackInterval, + const framing::Uuid& id=framing::Uuid(true)); /** * Create a non-resumable session. Does not store session frames, @@ -69,7 +66,6 @@ class SessionState SessionState(const framing::Uuid& id=framing::Uuid(true)); const framing::Uuid& getId() const { return id; } - State getState() const { return state; } /** Received incoming L3 frame. * @return SequenceNumber if an ack should be sent, empty otherwise. @@ -92,13 +88,6 @@ class SessionState */ Replay replay(); - /** Suspend the session. */ - void suspend(); - - /** Start resume protocol for the session. - *@returns sequence number to ack immediately. */ - SequenceNumber resuming(); - /** About to send an unscheduled ack, e.g. to respond to a solicit-ack. * * Note: when received() returns a sequence number this function @@ -115,9 +104,7 @@ class SessionState bool sendingSolicit(); - State state; framing::Uuid id; - Unacked unackedOut; SequenceNumber lastReceived; SequenceNumber lastSent; diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp index c8d912801e..19a146c759 100644 --- a/qpid/cpp/src/tests/SessionState.cpp +++ b/qpid/cpp/src/tests/SessionState.cpp @@ -97,21 +97,16 @@ BOOST_AUTO_TEST_CASE(testReplay) { // Replay of all frames. SessionState session(100); sent(session, "abc"); - session.suspend(); session.resuming(); session.receivedAck(-1); BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc"); // Replay with acks session.receivedAck(0); // ack a. - session.suspend(); - session.resuming(); session.receivedAck(1); // ack b. BOOST_CHECK_EQUAL(replayChars(session.replay()), "c"); // Replay after further frames. sent(session, "def"); - session.suspend(); - session.resuming(); session.receivedAck(3); BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef"); |