diff options
author | Alan Conway <aconway@apache.org> | 2008-06-13 17:36:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-06-13 17:36:23 +0000 |
commit | 0a99f79e0d90f0d1c0836fbef124bfe269677840 (patch) | |
tree | 1a6ee2ce409947d5a6956411483e9182e0f76a0c | |
parent | 34cdb55ef0a755be5f0bbd965418b11e08e86031 (diff) | |
download | qpid-python-0a99f79e0d90f0d1c0836fbef124bfe269677840.tar.gz |
Fix for broker wraparound problem.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@667603 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/SequenceNumber.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/log/Logger.cpp | 3 |
5 files changed, 25 insertions, 12 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 9ef2f5f8fb..1be0111489 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -124,18 +124,20 @@ void SessionState::senderRecord(const AMQFrame& f) { throw ResourceLimitExceededException("Replay buffer exceeeded hard limit"); } +static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536; + bool SessionState::senderNeedFlush() const { - return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit; + return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) || + (config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit); } void SessionState::senderRecordFlush() { - assert(sender.flushPoint <= sender.sendPoint); sender.flushPoint = sender.sendPoint; sender.unflushedSize = 0; } bool SessionState::senderNeedKnownCompleted() const { - return sender.bytesSinceKnownCompleted >= config.replayFlushLimit; + return config.replayFlushLimit && sender.bytesSinceKnownCompleted >= config.replayFlushLimit; } void SessionState::senderRecordKnownCompleted() { @@ -214,7 +216,8 @@ void SessionState::receiverKnownCompleted(const SequenceSet& commands) { } bool SessionState::receiverNeedKnownCompleted() const { - return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit; + return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) || + (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >= config.replayFlushLimit); } const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; } diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index fa05cee1b3..35587940e5 100644 --- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -75,6 +75,8 @@ void SessionHandler::handleIn(AMQFrame& f) { throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data")); if (!getState()->receiverRecord(f)) return; // Ignore duplicates. + if (getState()->receiverNeedKnownCompleted()) + sendCompletion(); getInHandler()->handle(f); } } @@ -94,13 +96,22 @@ void SessionHandler::handleIn(AMQFrame& f) { } } +namespace { +bool isControl(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->type() == framing::CONTROL; +} +bool isCommand(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->type() == framing::COMMAND; +} +} // namespace + void SessionHandler::handleOut(AMQFrame& f) { checkAttached(); if (!sendReady) throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data")); getState()->senderRecord(f); - if (getState()->senderNeedFlush()) { - peer.flush(false, true, true); + if (isCommand(f) && getState()->senderNeedFlush()) { + peer.flush(false, false, true); getState()->senderRecordFlush(); } channel.handle(f); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 606ca1bb59..f008eb23f7 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -85,7 +85,7 @@ Broker::Options::Options(const std::string& name) : mgmtPubInterval(10), auth(AUTH_DEFAULT), realm("QPID"), - replayFlushLimit(1024), + replayFlushLimit(0), replayHardLimit(0) { int c = sys::SystemInfo::concurrency(); @@ -109,9 +109,7 @@ Broker::Options::Options(const std::string& name) : ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") - ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") - ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.") - ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit."); + ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication"); } const std::string empty; diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp index 9e682d89e4..7caaf5440b 100644 --- a/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/cpp/src/qpid/framing/SequenceNumber.cpp @@ -26,7 +26,7 @@ using qpid::framing::SequenceNumber; using qpid::framing::Buffer; -SequenceNumber::SequenceNumber() : value(0 - 1) {} +SequenceNumber::SequenceNumber() : value(0) {} SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {} diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index 84096f7e58..30cec2f0f7 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -48,7 +48,8 @@ struct OstreamOutput : public Logger::Output { OstreamOutput(std::ostream& o) : out(&o) {} OstreamOutput(const string& file) - : out(new ofstream(file.c_str())), mine(out) + : out(new ofstream(file.c_str(), ios_base::out | ios_base::app)), + mine(out) { if (!out->good()) throw std::runtime_error("Can't open log file: "+file); |