summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-06-13 17:36:23 +0000
committerAlan Conway <aconway@apache.org>2008-06-13 17:36:23 +0000
commit0a99f79e0d90f0d1c0836fbef124bfe269677840 (patch)
tree1a6ee2ce409947d5a6956411483e9182e0f76a0c
parent34cdb55ef0a755be5f0bbd965418b11e08e86031 (diff)
downloadqpid-python-0a99f79e0d90f0d1c0836fbef124bfe269677840.tar.gz
Fix for broker wraparound problem.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@667603 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/SessionState.cpp11
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp15
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.cpp2
-rw-r--r--cpp/src/qpid/log/Logger.cpp3
5 files changed, 25 insertions, 12 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 9ef2f5f8fb..1be0111489 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -124,18 +124,20 @@ void SessionState::senderRecord(const AMQFrame& f) {
throw ResourceLimitExceededException("Replay buffer exceeeded hard limit");
}
+static const uint32_t SPONTANEOUS_REQUEST_INTERVAL = 65536;
+
bool SessionState::senderNeedFlush() const {
- return config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit;
+ return (sender.sendPoint.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+ (config.replayFlushLimit && sender.unflushedSize >= config.replayFlushLimit);
}
void SessionState::senderRecordFlush() {
- assert(sender.flushPoint <= sender.sendPoint);
sender.flushPoint = sender.sendPoint;
sender.unflushedSize = 0;
}
bool SessionState::senderNeedKnownCompleted() const {
- return sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
+ return config.replayFlushLimit && sender.bytesSinceKnownCompleted >= config.replayFlushLimit;
}
void SessionState::senderRecordKnownCompleted() {
@@ -214,7 +216,8 @@ void SessionState::receiverKnownCompleted(const SequenceSet& commands) {
}
bool SessionState::receiverNeedKnownCompleted() const {
- return receiver.bytesSinceKnownCompleted >= config.replayFlushLimit;
+ return (receiver.expected.command % SPONTANEOUS_REQUEST_INTERVAL == 0) ||
+ (config.replayFlushLimit && receiver.bytesSinceKnownCompleted >= config.replayFlushLimit);
}
const SessionPoint& SessionState::receiverGetExpected() const { return receiver.expected; }
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index fa05cee1b3..35587940e5 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -75,6 +75,8 @@ void SessionHandler::handleIn(AMQFrame& f) {
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to receive data"));
if (!getState()->receiverRecord(f))
return; // Ignore duplicates.
+ if (getState()->receiverNeedKnownCompleted())
+ sendCompletion();
getInHandler()->handle(f);
}
}
@@ -94,13 +96,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
+namespace {
+bool isControl(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::CONTROL;
+}
+bool isCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::COMMAND;
+}
+} // namespace
+
void SessionHandler::handleOut(AMQFrame& f) {
checkAttached();
if (!sendReady)
throw IllegalStateException(QPID_MSG(getState()->getId() << ": Not ready to send data"));
getState()->senderRecord(f);
- if (getState()->senderNeedFlush()) {
- peer.flush(false, true, true);
+ if (isCommand(f) && getState()->senderNeedFlush()) {
+ peer.flush(false, false, true);
getState()->senderRecordFlush();
}
channel.handle(f);
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 606ca1bb59..f008eb23f7 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -85,7 +85,7 @@ Broker::Options::Options(const std::string& name) :
mgmtPubInterval(10),
auth(AUTH_DEFAULT),
realm("QPID"),
- replayFlushLimit(1024),
+ replayFlushLimit(0),
replayHardLimit(0)
{
int c = sys::SystemInfo::concurrency();
@@ -109,9 +109,7 @@ Broker::Options::Options(const std::string& name) :
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
- ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
- ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.")
- ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit.");
+ ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication");
}
const std::string empty;
diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp
index 9e682d89e4..7caaf5440b 100644
--- a/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -26,7 +26,7 @@
using qpid::framing::SequenceNumber;
using qpid::framing::Buffer;
-SequenceNumber::SequenceNumber() : value(0 - 1) {}
+SequenceNumber::SequenceNumber() : value(0) {}
SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index 84096f7e58..30cec2f0f7 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -48,7 +48,8 @@ struct OstreamOutput : public Logger::Output {
OstreamOutput(std::ostream& o) : out(&o) {}
OstreamOutput(const string& file)
- : out(new ofstream(file.c_str())), mine(out)
+ : out(new ofstream(file.c_str(), ios_base::out | ios_base::app)),
+ mine(out)
{
if (!out->good())
throw std::runtime_error("Can't open log file: "+file);