summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp180
1 files changed, 135 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index f311ea8321..0cc0c514a7 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -29,33 +29,114 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {
- QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
+namespace {
+ /** ensure that the configured flow control stop and resume values are
+ * valid with respect to the maximum queue capacity, and each other
+ */
+ template <typename T>
+ void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue)
+ {
+ if (resume > stop) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
+ << "=" << resume
+ << " must be less than qpid.flow_stop_" << type
+ << "=" << stop));
+ }
+ if (resume == 0) resume = stop;
+ if (max != 0 && (max < stop)) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
+ << "=" << stop
+ << " must be less than qpid.max_" << type
+ << "=" << max));
+ }
+ }
+
+ /** extract a capacity value as passing in an argument map
+ */
+ uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue)
+ {
+ FieldTable::ValuePtr v = settings.get(key);
+
+ int64_t result = 0;
+
+ if (!v) return defaultValue;
+ if (v->getType() == 0x23) {
+ QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
+ } else if (v->getType() == 0x33) {
+ QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
+ } else if (v->convertsTo<int64_t>()) {
+ result = v->get<int64_t>();
+ QPID_LOG(debug, "Got integer value for " << key << ": " << result);
+ if (result >= 0) return result;
+ } else if (v->convertsTo<string>()) {
+ string s(v->get<string>());
+ QPID_LOG(debug, "Got string value for " << key << ": " << s);
+ std::istringstream convert(s);
+ if (convert >> result && result >= 0) return result;
+ }
+
+ QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
+ return defaultValue;
+ }
+
+
+}
+
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type,
+ uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize)
+ : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false),
+ flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
+ flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
+ flowStopped(false), name(_name)
+{
+ validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", name );
+ validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", name );
+ QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize
+ << "; flowStopCount=" << flowStopCount << "; flowResumeCount=" << flowResumeCount
+ << "; flowStopSize=" << flowStopSize << "; flowResumeSize=" << flowResumeSize );
}
void QueuePolicy::enqueued(uint64_t _size)
{
- if (maxCount) ++count;
- if (maxSize) size += _size;
+ if (maxCount || flowStopCount) {
+ ++count;
+ if (flowStopCount && !flowStopped && count > flowStopCount) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." );
+ }
+ }
+
+ if (maxSize || flowStopSize) {
+ size += _size;
+ if (flowStopSize && !flowStopped && size > flowStopSize) {
+ flowStopped = true;
+ QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." );
+ }
+ }
}
void QueuePolicy::dequeued(uint64_t _size)
{
- if (maxCount) {
+ if (maxCount || flowStopCount) {
if (count > 0) {
--count;
} else {
throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
}
}
- if (maxSize) {
+ if (maxSize || flowStopSize) {
if (_size > size) {
throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
} else {
size -= _size;
}
}
+ if (flowStopped &&
+ (flowResumeSize == 0 || size < flowResumeSize) &&
+ (flowResumeCount == 0 || count < flowResumeCount)) {
+ flowStopped = false;
+ QPID_LOG(info, "Queue \"" << name << "\": has drained below the flow control resume level. Producer flow control deactivated." );
+ }
}
bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
@@ -116,32 +197,6 @@ void QueuePolicy::update(FieldTable& settings)
settings.setString(typeKey, type);
}
-uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
-{
- FieldTable::ValuePtr v = settings.get(key);
-
- int32_t result = 0;
-
- if (!v) return defaultValue;
- if (v->getType() == 0x23) {
- QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
- } else if (v->getType() == 0x33) {
- QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int>()) {
- result = v->get<int>();
- QPID_LOG(debug, "Got integer value for " << key << ": " << result);
- if (result >= 0) return result;
- } else if (v->convertsTo<string>()) {
- string s(v->get<string>());
- QPID_LOG(debug, "Got string value for " << key << ": " << s);
- std::istringstream convert(s);
- if (convert >> result && result >= 0) return result;
- }
-
- QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")");
- return defaultValue;
-}
-
std::string QueuePolicy::getType(const FieldTable& settings)
{
FieldTable::ValuePtr v = settings.get(typeKey);
@@ -169,6 +224,10 @@ void QueuePolicy::encode(Buffer& buffer) const
buffer.putLongLong(maxSize);
buffer.putLong(count);
buffer.putLongLong(size);
+ buffer.putLong(flowStopCount);
+ buffer.putLong(flowResumeCount);
+ buffer.putLongLong(flowStopSize);
+ buffer.putLongLong(flowResumeSize);
}
void QueuePolicy::decode ( Buffer& buffer )
@@ -177,6 +236,10 @@ void QueuePolicy::decode ( Buffer& buffer )
maxSize = buffer.getLongLong();
count = buffer.getLong();
size = buffer.getLongLong();
+ flowStopCount = buffer.getLong();
+ flowResumeCount = buffer.getLong();
+ flowStopSize = buffer.getLongLong();
+ flowResumeSize = buffer.getLongLong();
}
@@ -184,7 +247,11 @@ uint32_t QueuePolicy::encodedSize() const {
return sizeof(uint32_t) + // maxCount
sizeof(uint64_t) + // maxSize
sizeof(uint32_t) + // count
- sizeof(uint64_t); // size
+ sizeof(uint64_t) + // size
+ sizeof(uint32_t) + // flowStopCount
+ sizeof(uint32_t) + // flowResumecount
+ sizeof(uint64_t) + // flowStopSize
+ sizeof(uint64_t); // flowResumeSize
}
@@ -192,14 +259,21 @@ uint32_t QueuePolicy::encodedSize() const {
const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
const std::string QueuePolicy::typeKey("qpid.policy_type");
+const std::string QueuePolicy::flowStopCountKey("qpid.flow_stop_count");
+const std::string QueuePolicy::flowResumeCountKey("qpid.flow_resume_count");
+const std::string QueuePolicy::flowStopSizeKey("qpid.flow_stop_size");
+const std::string QueuePolicy::flowResumeSizeKey("qpid.flow_resume_size");
const std::string QueuePolicy::REJECT("reject");
const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) :
- QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize,
+ uint32_t _flowStopCount, uint32_t _flowResumeCount,
+ uint64_t _flowStopSize, uint64_t _flowResumeSize)
+ : QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK,
+ _flowStopCount, _flowResumeCount, _flowStopSize, _flowResumeSize) {}
bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
@@ -208,8 +282,11 @@ bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
}
RingQueuePolicy::RingQueuePolicy(const std::string& _name,
- uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+ uint32_t _maxCount, uint64_t _maxSize, const std::string& _type,
+ uint32_t _flowStopCount, uint32_t _flowResumeCount,
+ uint64_t _flowStopSize, uint64_t _flowResumeSize)
+ : QueuePolicy(_name, _maxCount, _maxSize, _type, _flowStopCount, _flowResumeCount,
+ _flowStopSize, _flowResumeSize), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
@@ -317,23 +394,34 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
{
uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
- uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
- if (maxCount || maxSize) {
- return createQueuePolicy(name, maxCount, maxSize, getType(settings));
+ uint64_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+ uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
+ uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
+ uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
+ uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+
+ if (maxCount || maxSize || flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) {
+ return createQueuePolicy(name, maxCount, maxSize, getType(settings),
+ flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
} else {
return std::auto_ptr<QueuePolicy>();
}
}
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
- uint32_t maxCount, uint64_t maxSize, const std::string& type)
+ uint32_t maxCount, uint64_t maxSize, const std::string& type,
+ uint32_t flowStopCount, uint32_t flowResumeCount,
+ uint64_t flowStopSize, uint64_t flowResumeSize)
{
if (type == RING || type == RING_STRICT) {
- return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type,
+ flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
} else if (type == FLOW_TO_DISK) {
- return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize,
+ flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
} else {
- return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type,
+ flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
}
}
@@ -349,6 +437,8 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
else out << "count: unlimited";
out << "; type=" << p.type;
+ if (p.flowStopCount) out << "; flowStopCount=" << p.flowStopCount << ", flowResumeCount=" << p.flowResumeCount;
+ if (p.flowStopSize) out << "; flowStopSize=" << p.flowStopSize << ", flowResumeSize=" << p.flowResumeSize;
return out;
}