diff options
author | Alan Conway <aconway@apache.org> | 2013-08-01 20:26:34 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-01 20:26:34 +0000 |
commit | cda5959968dd695ec9eb3537e75fc45c1187b4b1 (patch) | |
tree | 06999be6651d230539cefbf7bf8607dc3aee15ae | |
parent | 7ea6c4ac9aba3dad352e5956e580058ff4a2090f (diff) | |
download | qpid-python-cda5959968dd695ec9eb3537e75fc45c1187b4b1.tar.gz |
QPID-4327: Refactor to simplify TxAccept.
Removed un-necessary RangeOps layers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509419 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 82 |
2 files changed, 54 insertions, 92 deletions
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 928ac12c10..fc0c96f467 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,54 +28,23 @@ using namespace qpid::broker; using qpid::framing::SequenceSet; using qpid::framing::SequenceNumber; -TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {} - -void TxAccept::RangeOp::prepare(TransactionContext* ctxt) -{ - for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt)); -} - -void TxAccept::RangeOp::commit() -{ - for_each(range.start, range.end, bind(&DeliveryRecord::committed, _1)); - for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1)); -} - -TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} -void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end) +TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : + acked(_acked), unacked(_unacked) { - ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end))); + for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i) + ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last())); } -void TxAccept::RangeOps::prepare(TransactionContext* ctxt) -{ - std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); -} - -void TxAccept::RangeOps::commit() -{ - std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); - //now remove if isRedundant(): - if (!ranges.empty()) { - DeliveryRecords::iterator begin = ranges.front().range.start; - DeliveryRecords::iterator end = ranges.back().range.end; - DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant)); - unacked.erase(removed, end); - } -} - -TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : - acked(_acked), unacked(_unacked), ops(unacked) -{ - //populate the ops - acked.for_each(ops); +void TxAccept::each(boost::function<void(DeliveryRecord&)> f) { + for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i) + for_each(i->start, i->end, f); } bool TxAccept::prepare(TransactionContext* ctxt) throw() { try{ - ops.prepare(ctxt); + each(bind(&DeliveryRecord::dequeue, _1, ctxt)); return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -86,10 +55,19 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() } } -void TxAccept::commit() throw() +void TxAccept::commit() throw() { try { - ops.commit(); + each(bind(&DeliveryRecord::committed, _1)); + each(bind(&DeliveryRecord::setEnded, _1)); + //now remove if isRedundant(): + if (!ranges.empty()) { + DeliveryRecords::iterator begin = ranges.front().start; + DeliveryRecords::iterator end = ranges.back().end; + DeliveryRecords::iterator removed = + remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant)); + unacked.erase(removed, end); + } } catch (const std::exception& e) { QPID_LOG(error, "Failed to commit: " << e.what()); } catch(...) { diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h index daf192285a..3a8b663039 100644 --- a/cpp/src/qpid/broker/TxAccept.h +++ b/cpp/src/qpid/broker/TxAccept.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,58 +21,42 @@ #ifndef _TxAccept_ #define _TxAccept_ -#include <algorithm> -#include <functional> -#include <list> #include "qpid/framing/SequenceSet.h" #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/TxOp.h" +#include <boost/function.hpp> +#include <algorithm> +#include <functional> +#include <list> namespace qpid { - namespace broker { - /** - * Defines the transactional behaviour for accepts received by - * a transactional channel. - */ - class TxAccept : public TxOp { - struct RangeOp - { - AckRange range; - - RangeOp(const AckRange& r); - void prepare(TransactionContext* ctxt); - void commit(); - }; - - struct RangeOps - { - std::vector<RangeOp> ranges; - DeliveryRecords& unacked; - - RangeOps(DeliveryRecords& u); - - void operator()(framing::SequenceNumber start, framing::SequenceNumber end); - void prepare(TransactionContext* ctxt); - void commit(); - }; - - framing::SequenceSet acked; - DeliveryRecords& unacked; - RangeOps ops; - - public: - /** - * @param acked a representation of the accumulation of - * acks received - * @param unacked the record of delivered messages - */ - TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~TxAccept(){} - }; - } +namespace broker { +/** + * Defines the transactional behaviour for accepts received by + * a transactional channel. + */ +class TxAccept : public TxOp { + typedef std::vector<AckRange> AckRanges; + + void each(boost::function<void(DeliveryRecord&)>); + + framing::SequenceSet acked; + DeliveryRecords& unacked; + AckRanges ranges; + + public: + /** + * @param acked a representation of the accumulation of + * acks received + * @param unacked the record of delivered messages + */ + TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~TxAccept(){} +}; +} } |