diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:31 +0000 |
| commit | f058abdb749c6f3d261989ff35252abe20807dc3 (patch) | |
| tree | 8f4845f7da4803558e3b4265c87bcea035ff4178 /cpp/src/qpid/broker/QueueFlowLimit.cpp | |
| parent | 9354a06998e6031e433b07579296c35a8aee6b5c (diff) | |
| download | qpid-python-f058abdb749c6f3d261989ff35252abe20807dc3.tar.gz | |
QPID-4514: Remove obsolete cluster code: QueueFlowLimit
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/QueueFlowLimit.cpp | 75 |
1 files changed, 2 insertions, 73 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index b887364d51..1ebaca2dae 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -291,79 +291,8 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& s return 0; } -/* Cluster replication */ - -namespace { - /** pack a set of sequence number ranges into a framing::Array */ - void buildSeqRangeArray(qpid::framing::Array *seqs, - const qpid::framing::SequenceNumber& first, - const qpid::framing::SequenceNumber& last) - { - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); - } -} - -/** Runs on UPDATER to snapshot current state */ -void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const -{ - sys::Mutex::ScopedLock l(indexLock); - state.clear(); - - framing::SequenceSet ss; - if (!index.empty()) { - /* replicate the set of messages pending flow control */ - for (std::map<framing::SequenceNumber, Message >::const_iterator itr = index.begin(); - itr != index.end(); ++itr) { - ss.add(itr->first); - } - framing::Array seqs(TYPE_CODE_UINT32); - typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder; - ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); - state.setArray("pendingMsgSeqs", seqs); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); -} - - -/** called on UPDATEE to set state from snapshot */ -void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) -{ - sys::Mutex::ScopedLock l(indexLock); - index.clear(); - - framing::SequenceSet fcmsg; - framing::Array seqArray(TYPE_CODE_UINT32); - if (state.getArray("pendingMsgSeqs", seqArray)) { - assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges - framing::Array::const_iterator i = seqArray.begin(); - while (i != seqArray.end()) { - framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - fcmsg.add(first, last); - for (SequenceNumber seq = first; seq <= last; ++seq) { - Message msg; - queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked - bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, Message >(seq, msg)).second; - // Like this to avoid tripping up unused variable warning when NDEBUG set - if (!unique) assert(unique); - } - } - } - - flowStopped = index.size() != 0; - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) -} - - namespace qpid { - namespace broker { +namespace broker { std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) { @@ -372,6 +301,6 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) return out; } - } +} } |
