diff options
author | Alan Conway <aconway@apache.org> | 2007-10-29 16:50:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-10-29 16:50:45 +0000 |
commit | bcf9c194c3d777fff7f776af25379d8a44f012aa (patch) | |
tree | e9e1afe41970e8f295d91b773379844d9258cc65 /cpp/src | |
parent | 0276d542a50b079a94132b2532b5cd016853006b (diff) | |
download | qpid-python-bcf9c194c3d777fff7f776af25379d8a44f012aa.tar.gz |
##-*-text-*-
Added qpidd --ack option to set ack/solicit-ack interval. 0 disabled acks.
Sessions with 0 timeout never ack and don't store replay frames.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@589731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SessionState.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SessionState.h | 17 |
3 files changed, 39 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index b88f1c6c6a..051c872e77 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -65,7 +65,7 @@ Broker::Options::Options(const std::string& name) : storeAsync(false), enableMgmt(0), mgmtPubInterval(10), - ack(100) + ack(100) { addOptions() ("port,p", optValue(port,"PORT"), @@ -87,7 +87,9 @@ Broker::Options::Options(const std::string& name) : ("mgmt,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), - "Management Publish Interval"); + "Management Publish Interval") + ("ack", optValue(ack, "N"), + "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack"); } const std::string empty; diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp index 045a0ae115..8056f4a523 100644 --- a/cpp/src/qpid/framing/SessionState.cpp +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -40,11 +40,22 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) : ackInterval(ack), sendAckAt(lastReceived+ackInterval), solicitAckAt(lastSent+ackInterval), - ackSolicited(false) + ackSolicited(false), + resumable(true) +{} + +SessionState::SessionState(const Uuid& uuid) : + state(ATTACHED), + id(uuid), + lastReceived(-1), + lastSent(-1), + ackInterval(0), + sendAckAt(0), + solicitAckAt(0), + ackSolicited(false), + resumable(false) { - assert(ackInterval > 0); } - namespace { bool isSessionCommand(const AMQFrame& f) { return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID; @@ -58,10 +69,9 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { throw CommandInvalidException( QPID_MSG("Invalid frame: Resuming session, expected session-ack")); assert(state = ATTACHED); - assert(lastReceived<sendAckAt); ++lastReceived; QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); - if (lastReceived == sendAckAt) + if (ackInterval && lastReceived == sendAckAt) return sendingAck(); else return boost::none; @@ -70,10 +80,12 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { bool SessionState::sent(const AMQFrame& f) { if (isSessionCommand(f)) return false; - unackedOut.push_back(f); + if (resumable) + unackedOut.push_back(f); ++lastSent; QPID_LOG(trace, "Sent # "<< lastSent << " " << id); - return (state!=RESUMING) && + return ackInterval && + (state!=RESUMING) && (lastSent == solicitAckAt) && sendingSolicit(); } @@ -105,10 +117,12 @@ bool SessionState::sendingSolicit() { if (ackSolicited) return false; solicitAckAt = lastSent + ackInterval; - return true; + return ackInterval != 0; } SequenceNumber SessionState::resuming() { + if (!resumable) + throw InternalErrorException("Session is not resumable"); state = RESUMING; return sendingAck(); } diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h index 66fc083d3f..361c960db1 100644 --- a/cpp/src/qpid/framing/SessionState.h +++ b/cpp/src/qpid/framing/SessionState.h @@ -35,12 +35,11 @@ namespace framing { /** * Session state common to client and broker. - * Implements session ack/resume protcools. + * Stores replay frames, implements session ack/resume protcools. * * A SessionState is always associated with an _open_ session (attached or * suspended) it is destroyed when the session is closed. * - * A template to make it protocol independent and easy to test. */ class SessionState { @@ -58,9 +57,16 @@ class SessionState *Create a newly opened active session. *@param ackInterval send/solicit an ack whenever N unacked frames * have been received/sent. - *@pre ackInterval > 0 + * + * N=0 disables voluntary send/solict ack. + */ + SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true)); + + /** + * Create a non-resumable session. Does not store session frames, + * never volunteers ack or solicit-ack. */ - SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true)); + SessionState(const framing::Uuid& id=framing::Uuid(true)); const framing::Uuid& getId() const { return id; } State getState() const { return state; } @@ -103,6 +109,7 @@ class SessionState SequenceNumber getLastSent() const { return lastSent; } SequenceNumber getLastReceived() const { return lastReceived; } + private: typedef std::deque<AMQFrame> Unacked; @@ -110,6 +117,7 @@ class SessionState State state; framing::Uuid id; + Unacked unackedOut; SequenceNumber lastReceived; SequenceNumber lastSent; @@ -118,6 +126,7 @@ class SessionState SequenceNumber solicitAckAt; bool ackSolicited; bool suspending; + bool resumable; }; |