summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/QueueFlowLimit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp87
1 files changed, 5 insertions, 82 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 11b9cbae63..9b2e31c925 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -29,7 +29,6 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/SessionState.h"
-#include "qpid/sys/ClusterSafe.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
@@ -66,10 +65,10 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
+ : queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
- flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
+ flowStopped(false), count(0), size(0), broker(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
@@ -79,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
- queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
if (queueMgmtObj) {
queueMgmtObj->set_flowStopped(isFlowControlActive());
}
@@ -130,11 +129,6 @@ void QueueFlowLimit::enqueued(const Message& msg)
}
if (flowStopped || !index.empty()) {
- // ignore flow control if we are populating the queue due to cluster replication:
- if (broker && broker->isClusterUpdatee()) {
- QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
- return;
- }
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
@@ -297,79 +291,8 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& s
return 0;
}
-/* Cluster replication */
-
-namespace {
- /** pack a set of sequence number ranges into a framing::Array */
- void buildSeqRangeArray(qpid::framing::Array *seqs,
- const qpid::framing::SequenceNumber& first,
- const qpid::framing::SequenceNumber& last)
- {
- seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first)));
- seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last)));
- }
-}
-
-/** Runs on UPDATER to snapshot current state */
-void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
-{
- sys::Mutex::ScopedLock l(indexLock);
- state.clear();
-
- framing::SequenceSet ss;
- if (!index.empty()) {
- /* replicate the set of messages pending flow control */
- for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin();
- itr != index.end(); ++itr) {
- ss.add(itr->first);
- }
- framing::Array seqs(TYPE_CODE_UINT32);
- typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder;
- ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
- state.setArray("pendingMsgSeqs", seqs);
- }
- QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
-{
- sys::Mutex::ScopedLock l(indexLock);
- index.clear();
-
- framing::SequenceSet fcmsg;
- framing::Array seqArray(TYPE_CODE_UINT32);
- if (state.getArray("pendingMsgSeqs", seqArray)) {
- assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges
- framing::Array::const_iterator i = seqArray.begin();
- while (i != seqArray.end()) {
- framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>());
- ++i;
- framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
- ++i;
- fcmsg.add(first, last);
- for (SequenceNumber seq = first; seq <= last; ++seq) {
- Message msg;
- queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked
- bool unique;
- unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second;
- // Like this to avoid tripping up unused variable warning when NDEBUG set
- if (!unique) assert(unique);
- }
- }
- }
-
- flowStopped = index.size() != 0;
- if (queueMgmtObj) {
- queueMgmtObj->set_flowStopped(isFlowControlActive());
- }
- QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg)
-}
-
-
namespace qpid {
- namespace broker {
+namespace broker {
std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
{
@@ -378,6 +301,6 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
return out;
}
- }
+}
}