summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-29 16:50:45 +0000
committerAlan Conway <aconway@apache.org>2007-10-29 16:50:45 +0000
commitbcf9c194c3d777fff7f776af25379d8a44f012aa (patch)
treee9e1afe41970e8f295d91b773379844d9258cc65 /cpp/src
parent0276d542a50b079a94132b2532b5cd016853006b (diff)
downloadqpid-python-bcf9c194c3d777fff7f776af25379d8a44f012aa.tar.gz
##-*-text-*-
Added qpidd --ack option to set ack/solicit-ack interval. 0 disabled acks. Sessions with 0 timeout never ack and don't store replay frames. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@589731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp30
-rw-r--r--cpp/src/qpid/framing/SessionState.h17
3 files changed, 39 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index b88f1c6c6a..051c872e77 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -65,7 +65,7 @@ Broker::Options::Options(const std::string& name) :
storeAsync(false),
enableMgmt(0),
mgmtPubInterval(10),
- ack(100)
+ ack(100)
{
addOptions()
("port,p", optValue(port,"PORT"),
@@ -87,7 +87,9 @@ Broker::Options::Options(const std::string& name) :
("mgmt,m", optValue(enableMgmt,"yes|no"),
"Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
- "Management Publish Interval");
+ "Management Publish Interval")
+ ("ack", optValue(ack, "N"),
+ "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack");
}
const std::string empty;
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
index 045a0ae115..8056f4a523 100644
--- a/cpp/src/qpid/framing/SessionState.cpp
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -40,11 +40,22 @@ SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
ackInterval(ack),
sendAckAt(lastReceived+ackInterval),
solicitAckAt(lastSent+ackInterval),
- ackSolicited(false)
+ ackSolicited(false),
+ resumable(true)
+{}
+
+SessionState::SessionState(const Uuid& uuid) :
+ state(ATTACHED),
+ id(uuid),
+ lastReceived(-1),
+ lastSent(-1),
+ ackInterval(0),
+ sendAckAt(0),
+ solicitAckAt(0),
+ ackSolicited(false),
+ resumable(false)
{
- assert(ackInterval > 0);
}
-
namespace {
bool isSessionCommand(const AMQFrame& f) {
return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
@@ -58,10 +69,9 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
throw CommandInvalidException(
QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
assert(state = ATTACHED);
- assert(lastReceived<sendAckAt);
++lastReceived;
QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
- if (lastReceived == sendAckAt)
+ if (ackInterval && lastReceived == sendAckAt)
return sendingAck();
else
return boost::none;
@@ -70,10 +80,12 @@ boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
bool SessionState::sent(const AMQFrame& f) {
if (isSessionCommand(f))
return false;
- unackedOut.push_back(f);
+ if (resumable)
+ unackedOut.push_back(f);
++lastSent;
QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
- return (state!=RESUMING) &&
+ return ackInterval &&
+ (state!=RESUMING) &&
(lastSent == solicitAckAt) &&
sendingSolicit();
}
@@ -105,10 +117,12 @@ bool SessionState::sendingSolicit() {
if (ackSolicited)
return false;
solicitAckAt = lastSent + ackInterval;
- return true;
+ return ackInterval != 0;
}
SequenceNumber SessionState::resuming() {
+ if (!resumable)
+ throw InternalErrorException("Session is not resumable");
state = RESUMING;
return sendingAck();
}
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
index 66fc083d3f..361c960db1 100644
--- a/cpp/src/qpid/framing/SessionState.h
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -35,12 +35,11 @@ namespace framing {
/**
* Session state common to client and broker.
- * Implements session ack/resume protcools.
+ * Stores replay frames, implements session ack/resume protcools.
*
* A SessionState is always associated with an _open_ session (attached or
* suspended) it is destroyed when the session is closed.
*
- * A template to make it protocol independent and easy to test.
*/
class SessionState
{
@@ -58,9 +57,16 @@ class SessionState
*Create a newly opened active session.
*@param ackInterval send/solicit an ack whenever N unacked frames
* have been received/sent.
- *@pre ackInterval > 0
+ *
+ * N=0 disables voluntary send/solict ack.
+ */
+ SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true));
+
+ /**
+ * Create a non-resumable session. Does not store session frames,
+ * never volunteers ack or solicit-ack.
*/
- SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true));
+ SessionState(const framing::Uuid& id=framing::Uuid(true));
const framing::Uuid& getId() const { return id; }
State getState() const { return state; }
@@ -103,6 +109,7 @@ class SessionState
SequenceNumber getLastSent() const { return lastSent; }
SequenceNumber getLastReceived() const { return lastReceived; }
+
private:
typedef std::deque<AMQFrame> Unacked;
@@ -110,6 +117,7 @@ class SessionState
State state;
framing::Uuid id;
+
Unacked unackedOut;
SequenceNumber lastReceived;
SequenceNumber lastSent;
@@ -118,6 +126,7 @@ class SessionState
SequenceNumber solicitAckAt;
bool ackSolicited;
bool suspending;
+ bool resumable;
};