diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/SessionState.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/SessionState.cpp | 37 |
5 files changed, 72 insertions, 11 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 4b4e79801f..ce8f45d085 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -90,6 +90,10 @@ bool SessionPoint::operator==(const SessionPoint& x) const { } +SessionState::SendState::SendState() : unflushedSize(), replaySize(), bytesSinceKnownCompleted() {} + +SessionState::ReceiveState::ReceiveState() {} + SessionPoint SessionState::senderGetCommandPoint() { return sender.sendPoint; } SequenceSet SessionState::senderGetIncomplete() const { return sender.incomplete; } SessionPoint SessionState::senderGetReplayPoint() const { return sender.replayPoint; } @@ -112,6 +116,7 @@ void SessionState::senderRecord(const AMQFrame& f) { stateful = true; if (timeout) sender.replayList.push_back(f); sender.unflushedSize += f.size(); + sender.bytesSinceKnownCompleted += f.size(); sender.replaySize += f.size(); sender.incomplete += sender.sendPoint.command; sender.sendPoint.advance(f); @@ -129,6 +134,18 @@ void SessionState::senderRecordFlush() { sender.unflushedSize = 0; } +bool SessionState::senderNeedKnownCompleted() const { + // FIXME aconway 2008-06-04: this is unpleasant - replayFlushLimit == 0 + // means never send spontaneous flush, but sends a knownCompleted for + // every completed. Need separate configuration? + // + return sender.bytesSinceKnownCompleted >= config.replayFlushLimit; +} + +void SessionState::senderRecordKnownCompleted() { + sender.bytesSinceKnownCompleted = 0; +} + void SessionState::senderConfirmed(const SessionPoint& confirmed) { if (confirmed > sender.sendPoint) throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent.")); @@ -213,7 +230,8 @@ SequenceNumber SessionState::receiverGetCurrent() const { SessionState::Configuration::Configuration(size_t flush, size_t hard) : replayFlushLimit(flush), replayHardLimit(hard) {} -SessionState::SessionState(const SessionId& i, const Configuration& c) : id(i), timeout(), config(c), stateful() +SessionState::SessionState(const SessionId& i, const Configuration& c) + : id(i), timeout(), config(c), stateful() { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } diff --git a/cpp/src/qpid/SessionState.h b/cpp/src/qpid/SessionState.h index 40462e56e7..80d218b8bb 100644 --- a/cpp/src/qpid/SessionState.h +++ b/cpp/src/qpid/SessionState.h @@ -78,7 +78,7 @@ class SessionState { typedef boost::iterator_range<ReplayList::iterator> ReplayRange; struct Configuration { - Configuration(size_t flush=0, size_t hard=0); + Configuration(size_t flush=1024*1024, size_t hard=0); size_t replayFlushLimit; // Flush when the replay list >= N bytes. 0 disables. size_t replayHardLimit; // Kill session if replay list > N bytes. 0 disables. }; @@ -108,6 +108,12 @@ class SessionState { /** Called when flush for confirmed and completed commands is sent to peer. */ virtual void senderRecordFlush(); + /** True if we should reply to the next incoming completed command */ + virtual bool senderNeedKnownCompleted() const; + + /** Called when knownCompleted is sent to peer. */ + virtual void senderRecordKnownCompleted(); + /** Called when the peer confirms up to comfirmed. */ virtual void senderConfirmed(const SessionPoint& confirmed); @@ -128,7 +134,6 @@ class SessionState { */ virtual ReplayRange senderExpected(const SessionPoint& expected); - // ==== Functions for receiver state /** Set the command point. */ @@ -161,7 +166,7 @@ class SessionState { private: struct SendState { - SendState() : unflushedSize(), replaySize() {} + SendState(); // invariant: replayPoint <= flushPoint <= sendPoint SessionPoint replayPoint; // Can replay from this point SessionPoint flushPoint; // Point of last flush @@ -170,16 +175,15 @@ class SessionState { size_t unflushedSize; // Un-flushed bytes in replay list. size_t replaySize; // Total bytes in replay list. SequenceSet incomplete; // Commands sent and not yet completed. + size_t bytesSinceKnownCompleted; // Bytes sent since we last issued a knownCompleted. } sender; struct ReceiveState { - ReceiveState() {} + ReceiveState(); SessionPoint expected; // Expected from here SessionPoint received; // Received to here. Invariant: expected <= received. SequenceSet unknownCompleted; // Received & completed, may not not known-complete by peer. SequenceSet incomplete; // Incomplete received commands. - int segmentType; - } receiver; SessionId id; diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 0d1cc57072..53f847a3cf 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -196,11 +196,13 @@ void SessionHandler::confirmed(const SequenceSet& commands, const Array& /*fragm getState()->senderConfirmed(commands.rangesBegin()->last()); } -void SessionHandler::completed(const SequenceSet& commands, bool /*timelyReply*/) { +void SessionHandler::completed(const SequenceSet& commands, bool timelyReply) { checkAttached(); getState()->senderCompleted(commands); - if (!commands.empty()) - peer.knownCompleted(commands); // Always send a timely reply + if (getState()->senderNeedKnownCompleted() || timelyReply) { + peer.knownCompleted(commands); + getState()->senderRecordKnownCompleted(); + } } void SessionHandler::knownCompleted(const SequenceSet& commands) { diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fc789e967a..d590b0358f 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -84,7 +84,7 @@ Broker::Options::Options(const std::string& name) : enableMgmt(1), mgmtPubInterval(10), auth(AUTH_DEFAULT), - replayFlushLimit(64), + replayFlushLimit(1024), replayHardLimit(0) { int c = sys::SystemInfo::concurrency(); diff --git a/cpp/src/tests/SessionState.cpp b/cpp/src/tests/SessionState.cpp index 51259bac2b..ba966da9b1 100644 --- a/cpp/src/tests/SessionState.cpp +++ b/cpp/src/tests/SessionState.cpp @@ -260,4 +260,41 @@ QPID_AUTO_TEST_CASE(testCompleted) { // TODO aconway 2008-04-30: missing tests for known-completed. } +QPID_AUTO_TEST_CASE(testNeedKnownCompleted) { + size_t flushInterval= 2*(transferFrameSize()+contentFrameSize())+1; + qpid::SessionState::Configuration c(flushInterval); + qpid::SessionState s(qpid::SessionId(), c); + s.senderGetCommandPoint(); + transfers(s, "a"); + SequenceSet set(SequenceSet() + 0); + s.senderCompleted(set); + BOOST_CHECK(!s.senderNeedKnownCompleted()); + + transfers(s, "b"); + set += 1; + s.senderCompleted(set); + BOOST_CHECK(!s.senderNeedKnownCompleted()); + + transfers(s, "c"); + set += 2; + s.senderCompleted(set); + BOOST_CHECK(s.senderNeedKnownCompleted()); + s.senderRecordKnownCompleted(); + BOOST_CHECK(!s.senderNeedKnownCompleted()); + + transfers(s, "de"); + set += 3; + set += 4; + s.senderCompleted(set); + BOOST_CHECK(!s.senderNeedKnownCompleted()); + + transfers(s, "f"); + set += 2; + s.senderCompleted(set); + BOOST_CHECK(s.senderNeedKnownCompleted()); + s.senderRecordKnownCompleted(); + BOOST_CHECK(!s.senderNeedKnownCompleted()); +} + + QPID_AUTO_TEST_SUITE_END() |