summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/TxAccept.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-10-24 12:34:29 +0000
committerGordon Sim <gsim@apache.org>2008-10-24 12:34:29 +0000
commit2effbc58544688cfeabc4d7d6dcbd59805233de0 (patch)
tree8a3dd46362058dc64dd897e2aa32428b18b748e1 /cpp/src/qpid/broker/TxAccept.cpp
parent93b372e4d8d51a3db161a95a8b1884ce6af117fd (diff)
downloadqpid-python-2effbc58544688cfeabc4d7d6dcbd59805233de0.tar.gz
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707615 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/TxAccept.cpp')
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp61
1 files changed, 49 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 82acf61cd1..6d307bf735 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -26,19 +26,60 @@ using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
using qpid::framing::SequenceSet;
+using qpid::framing::SequenceNumber;
+
+TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
+
+void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
+{
+ for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
+}
+
+void TxAccept::RangeOp::commit()
+{
+ for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
+}
+
+TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {}
+
+void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
+{
+ ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
+}
+
+void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
+{
+ for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
+}
+
+void TxAccept::RangeOps::commit()
+{
+ for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
+ //now remove if isRedundant():
+ if (!ranges.empty()) {
+ ack_iterator i = ranges.front().range.start;
+ ack_iterator end = ranges.back().range.end;
+ while (i != end) {
+ if (i->isRedundant()) {
+ i = unacked.erase(i);
+ } else {
+ i++;
+ }
+ }
+ }
+}
TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
- acked(_acked), unacked(_unacked) {}
+ acked(_acked), unacked(_unacked), ops(unacked)
+{
+ //populate the ops
+ acked.for_each(ops);
+}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
{
try{
- //dequeue messages from their respective queues:
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) {
- i->dequeue(ctxt);
- }
- }
+ ops.prepare(ctxt);
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -51,11 +92,7 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw()
void TxAccept::commit() throw()
{
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) i->setEnded();
- }
-
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+ ops.commit();
}
void TxAccept::rollback() throw() {}