summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/SessionState.cpp20
-rw-r--r--cpp/src/qpid/SessionState.h16
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp8
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/tests/SessionState.cpp37
5 files changed, 72 insertions, 11 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 4b4e79801f..ce8f45d085 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -90,6 +90,10 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
}
+SessionState::SendState::SendState() : unflushedSize(), replaySize(), bytesSinceKnownCompleted() {}
+
+SessionState::ReceiveState::ReceiveState() {}
+
SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; }
SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; }
SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; }
@@ -112,6 +116,7 @@ void SessionState::senderRecord(const AMQFrame& f) {
stateful = true;
if (timeout) sender.replayList.push_back(f);
sender.unflushedSize += f.size();
+ sender.bytesSinceKnownCompleted += f.size();
sender.replaySize += f.size();
sender.incomplete += sender.sendPoint.command;
sender.sendPoint.advance(f);
@@ -129,6 +134,18 @@ void SessionState::senderRecordFlush() {
sender.unflushedSize = 0;
}
+bool SessionState::senderNeedKnownCompleted() const {
+ // FIXME aconway 2008-06-04: this is unpleasant - replayFlushLimit == 0
+ // means never send spontaneous flush, but sends a knownCompleted for
+ // every completed. Need separate configuration?
+ //
+ return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+}
+
+void SessionState::senderRecordKnownCompleted() {
+ sender.bytesSinceKnownCompleted = 0;
+}
+
void SessionState::senderConfirmed(const SessionPoint& confirmed) {
if (confirmed > sender.sendPoint)
throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
@@ -213,7 +230,8 @@ SequenceNumber SessionState::receiverGetCurrent() const {
SessionState::Configuration::Configuration(size_t flush, size_t hard) :
replayFlushLimit(flush), replayHardLimit(hard) {}
-SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful()
+SessionState::SessionState(const SessionId& i, const Configuration& c)
+ : id(i), timeout(), config(c), stateful()
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h
index 40462e56e7..80d218b8bb 100644
--- a/cpp/src/qpid/SessionState.h
+++ b/cpp/src/qpid/SessionState.h
@@ -78,7 +78,7 @@ class SessionState {
typedef boost::iterator_range<ReplayList::iterator> ReplayRange;
struct Configuration {
- Configuration(size_t flush=0, size_t hard=0);
+ Configuration(size_t flush=1024*1024, size_t hard=0);
size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables.
size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables.
};
@@ -108,6 +108,12 @@ class SessionState {
/** Called when flush for confirmed and completed commands is sent to peer. */
virtual void senderRecordFlush();
+ /** True if we should reply to the next incoming completed command */
+ virtual bool senderNeedKnownCompleted() const;
+
+ /** Called when knownCompleted is sent to peer. */
+ virtual void senderRecordKnownCompleted();
+
/** Called when the peer confirms up to comfirmed. */
virtual void senderConfirmed(const SessionPoint& confirmed);
@@ -128,7 +134,6 @@ class SessionState {
*/
virtual ReplayRange senderExpected(const SessionPoint& expected);
-
// ==== Functions for receiver state
/** Set the command point. */
@@ -161,7 +166,7 @@ class SessionState {
private:
struct SendState {
- SendState() : unflushedSize(), replaySize() {}
+ SendState();
// invariant: replayPoint <= flushPoint <= sendPoint
SessionPoint replayPoint; // Can replay from this point
SessionPoint flushPoint; // Point of last flush
@@ -170,16 +175,15 @@ class SessionState {
size_t unflushedSize; // Un-flushed bytes in replay list.
size_t replaySize; // Total bytes in replay list.
SequenceSet incomplete; // Commands sent and not yet completed.
+ size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a knownCompleted.
} sender;
struct ReceiveState {
- ReceiveState() {}
+ ReceiveState();
SessionPoint expected; // Expected from here
SessionPoint received; // Received to here. Invariant: expected <= received.
SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer.
SequenceSet incomplete; // Incomplete received commands.
- int segmentType;
-
} receiver;
SessionId id;
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 0d1cc57072..53f847a3cf 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -196,11 +196,13 @@ void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragm
getState()->senderConfirmed(commands.rangesBegin()->last());
}
-void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) {
+void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) {
checkAttached();
getState()->senderCompleted(commands);
- if (!commands.empty())
- peer.knownCompleted(commands); // Always send a timely reply
+ if (getState()->senderNeedKnownCompleted() || timelyReply) {
+ peer.knownCompleted(commands);
+ getState()->senderRecordKnownCompleted();
+ }
}
void SessionHandler::knownCompleted(const SequenceSet& commands) {
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index fc789e967a..d590b0358f 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -84,7 +84,7 @@ Broker::Options::Options(const std::string& name) :
enableMgmt(1),
mgmtPubInterval(10),
auth(AUTH_DEFAULT),
- replayFlushLimit(64),
+ replayFlushLimit(1024),
replayHardLimit(0)
{
int c = sys::SystemInfo::concurrency();
diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp
index 51259bac2b..ba966da9b1 100644
--- a/cpp/src/tests/SessionState.cpp
+++ b/cpp/src/tests/SessionState.cpp
@@ -260,4 +260,41 @@ QPID_AUTO_TEST_CASE(testCompleted) {
// TODO aconway 2008-04-30: missing tests for known-completed.
}
+QPID_AUTO_TEST_CASE(testNeedKnownCompleted) {
+ size_t flushInterval= 2*(transferFrameSize()+contentFrameSize())+1;
+ qpid::SessionState::Configuration c(flushInterval);
+ qpid::SessionState s(qpid::SessionId(), c);
+ s.senderGetCommandPoint();
+ transfers(s, "a");
+ SequenceSet set(SequenceSet() + 0);
+ s.senderCompleted(set);
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "b");
+ set += 1;
+ s.senderCompleted(set);
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "c");
+ set += 2;
+ s.senderCompleted(set);
+ BOOST_CHECK(s.senderNeedKnownCompleted());
+ s.senderRecordKnownCompleted();
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "de");
+ set += 3;
+ set += 4;
+ s.senderCompleted(set);
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+
+ transfers(s, "f");
+ set += 2;
+ s.senderCompleted(set);
+ BOOST_CHECK(s.senderNeedKnownCompleted());
+ s.senderRecordKnownCompleted();
+ BOOST_CHECK(!s.senderNeedKnownCompleted());
+}
+
+
QPID_AUTO_TEST_SUITE_END()