diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 180 |
1 files changed, 135 insertions, 45 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index f311ea8321..0cc0c514a7 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -29,33 +29,114 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) { - QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize); +namespace { + /** ensure that the configured flow control stop and resume values are + * valid with respect to the maximum queue capacity, and each other + */ + template <typename T> + void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) + { + if (resume > stop) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type + << "=" << resume + << " must be less than qpid.flow_stop_" << type + << "=" << stop)); + } + if (resume == 0) resume = stop; + if (max != 0 && (max < stop)) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type + << "=" << stop + << " must be less than qpid.max_" << type + << "=" << max)); + } + } + + /** extract a capacity value as passing in an argument map + */ + uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) + { + FieldTable::ValuePtr v = settings.get(key); + + int64_t result = 0; + + if (!v) return defaultValue; + if (v->getType() == 0x23) { + QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); + } else if (v->getType() == 0x33) { + QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); + } else if (v->convertsTo<int64_t>()) { + result = v->get<int64_t>(); + QPID_LOG(debug, "Got integer value for " << key << ": " << result); + if (result >= 0) return result; + } else if (v->convertsTo<string>()) { + string s(v->get<string>()); + QPID_LOG(debug, "Got string value for " << key << ": " << s); + std::istringstream convert(s); + if (convert >> result && result >= 0) return result; + } + + QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); + return defaultValue; + } + + +} + +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type, + uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) + : maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), + flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), + flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), + flowStopped(false), name(_name) +{ + validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", name ); + validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", name ); + QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize + << "; flowStopCount=" << flowStopCount << "; flowResumeCount=" << flowResumeCount + << "; flowStopSize=" << flowStopSize << "; flowResumeSize=" << flowResumeSize ); } void QueuePolicy::enqueued(uint64_t _size) { - if (maxCount) ++count; - if (maxSize) size += _size; + if (maxCount || flowStopCount) { + ++count; + if (flowStopCount && !flowStopped && count > flowStopCount) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); + } + } + + if (maxSize || flowStopSize) { + size += _size; + if (flowStopSize && !flowStopped && size > flowStopSize) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << name << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + } + } } void QueuePolicy::dequeued(uint64_t _size) { - if (maxCount) { + if (maxCount || flowStopCount) { if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } - if (maxSize) { + if (maxSize || flowStopSize) { if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; } } + if (flowStopped && + (flowResumeSize == 0 || size < flowResumeSize) && + (flowResumeCount == 0 || count < flowResumeCount)) { + flowStopped = false; + QPID_LOG(info, "Queue \"" << name << "\": has drained below the flow control resume level. Producer flow control deactivated." ); + } } bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) @@ -116,32 +197,6 @@ void QueuePolicy::update(FieldTable& settings) settings.setString(typeKey, type); } -uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue) -{ - FieldTable::ValuePtr v = settings.get(key); - - int32_t result = 0; - - if (!v) return defaultValue; - if (v->getType() == 0x23) { - QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); - } else if (v->getType() == 0x33) { - QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); - } else if (v->convertsTo<int>()) { - result = v->get<int>(); - QPID_LOG(debug, "Got integer value for " << key << ": " << result); - if (result >= 0) return result; - } else if (v->convertsTo<string>()) { - string s(v->get<string>()); - QPID_LOG(debug, "Got string value for " << key << ": " << s); - std::istringstream convert(s); - if (convert >> result && result >= 0) return result; - } - - QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); - return defaultValue; -} - std::string QueuePolicy::getType(const FieldTable& settings) { FieldTable::ValuePtr v = settings.get(typeKey); @@ -169,6 +224,10 @@ void QueuePolicy::encode(Buffer& buffer) const buffer.putLongLong(maxSize); buffer.putLong(count); buffer.putLongLong(size); + buffer.putLong(flowStopCount); + buffer.putLong(flowResumeCount); + buffer.putLongLong(flowStopSize); + buffer.putLongLong(flowResumeSize); } void QueuePolicy::decode ( Buffer& buffer ) @@ -177,6 +236,10 @@ void QueuePolicy::decode ( Buffer& buffer ) maxSize = buffer.getLongLong(); count = buffer.getLong(); size = buffer.getLongLong(); + flowStopCount = buffer.getLong(); + flowResumeCount = buffer.getLong(); + flowStopSize = buffer.getLongLong(); + flowResumeSize = buffer.getLongLong(); } @@ -184,7 +247,11 @@ uint32_t QueuePolicy::encodedSize() const { return sizeof(uint32_t) + // maxCount sizeof(uint64_t) + // maxSize sizeof(uint32_t) + // count - sizeof(uint64_t); // size + sizeof(uint64_t) + // size + sizeof(uint32_t) + // flowStopCount + sizeof(uint32_t) + // flowResumecount + sizeof(uint64_t) + // flowStopSize + sizeof(uint64_t); // flowResumeSize } @@ -192,14 +259,21 @@ uint32_t QueuePolicy::encodedSize() const { const std::string QueuePolicy::maxCountKey("qpid.max_count"); const std::string QueuePolicy::maxSizeKey("qpid.max_size"); const std::string QueuePolicy::typeKey("qpid.policy_type"); +const std::string QueuePolicy::flowStopCountKey("qpid.flow_stop_count"); +const std::string QueuePolicy::flowResumeCountKey("qpid.flow_resume_count"); +const std::string QueuePolicy::flowStopSizeKey("qpid.flow_stop_size"); +const std::string QueuePolicy::flowResumeSizeKey("qpid.flow_resume_size"); const std::string QueuePolicy::REJECT("reject"); const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk"); const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : - QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize) + : QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK, + _flowStopCount, _flowResumeCount, _flowStopSize, _flowResumeSize) {} bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { @@ -208,8 +282,11 @@ bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) } RingQueuePolicy::RingQueuePolicy(const std::string& _name, - uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize) + : QueuePolicy(_name, _maxCount, _maxSize, _type, _flowStopCount, _flowResumeCount, + _flowStopSize, _flowResumeSize), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -317,23 +394,34 @@ std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::F std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) { uint32_t maxCount = getCapacity(settings, maxCountKey, 0); - uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); - if (maxCount || maxSize) { - return createQueuePolicy(name, maxCount, maxSize, getType(settings)); + uint64_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize); + uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); + uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); + uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); + uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); + + if (maxCount || maxSize || flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) { + return createQueuePolicy(name, maxCount, maxSize, getType(settings), + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } else { return std::auto_ptr<QueuePolicy>(); } } std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, - uint32_t maxCount, uint64_t maxSize, const std::string& type) + uint32_t maxCount, uint64_t maxSize, const std::string& type, + uint32_t flowStopCount, uint32_t flowResumeCount, + uint64_t flowStopSize, uint64_t flowResumeSize) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type, + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize, + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type, + flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } } @@ -349,6 +437,8 @@ std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; + if (p.flowStopCount) out << "; flowStopCount=" << p.flowStopCount << ", flowResumeCount=" << p.flowResumeCount; + if (p.flowStopSize) out << "; flowStopSize=" << p.flowStopSize << ", flowResumeSize=" << p.flowResumeSize; return out; } |