diff options
| author | Gordon Sim <gsim@apache.org> | 2008-10-24 12:34:29 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-10-24 12:34:29 +0000 |
| commit | 2effbc58544688cfeabc4d7d6dcbd59805233de0 (patch) | |
| tree | 8a3dd46362058dc64dd897e2aa32428b18b748e1 /cpp/src/qpid/broker/TxAccept.cpp | |
| parent | 93b372e4d8d51a3db161a95a8b1884ce6af117fd (diff) | |
| download | qpid-python-2effbc58544688cfeabc4d7d6dcbd59805233de0.tar.gz | |
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/TxAccept.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 82acf61cd1..6d307bf735 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -26,19 +26,60 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; using qpid::framing::SequenceSet; +using qpid::framing::SequenceNumber; + +TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {} + +void TxAccept::RangeOp::prepare(TransactionContext* ctxt) +{ + for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt)); +} + +void TxAccept::RangeOp::commit() +{ + for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1)); +} + +TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} + +void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end) +{ + ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end))); +} + +void TxAccept::RangeOps::prepare(TransactionContext* ctxt) +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt)); +} + +void TxAccept::RangeOps::commit() +{ + for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); + //now remove if isRedundant(): + if (!ranges.empty()) { + ack_iterator i = ranges.front().range.start; + ack_iterator end = ranges.back().range.end; + while (i != end) { + if (i->isRedundant()) { + i = unacked.erase(i); + } else { + i++; + } + } + } +} TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : - acked(_acked), unacked(_unacked) {} + acked(_acked), unacked(_unacked), ops(unacked) +{ + //populate the ops + acked.for_each(ops); +} bool TxAccept::prepare(TransactionContext* ctxt) throw() { try{ - //dequeue messages from their respective queues: - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->dequeue(ctxt); - } - } + ops.prepare(ctxt); return true; }catch(const std::exception& e){ QPID_LOG(error, "Failed to prepare: " << e.what()); @@ -51,11 +92,7 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw() void TxAccept::commit() throw() { - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) i->setEnded(); - } - - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + ops.commit(); } void TxAccept::rollback() throw() {} |
