summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-01 20:26:34 +0000
committerAlan Conway <aconway@apache.org>2013-08-01 20:26:34 +0000
commitcda5959968dd695ec9eb3537e75fc45c1187b4b1 (patch)
tree06999be6651d230539cefbf7bf8607dc3aee15ae
parent7ea6c4ac9aba3dad352e5956e580058ff4a2090f (diff)
downloadqpid-python-cda5959968dd695ec9eb3537e75fc45c1187b4b1.tar.gz
QPID-4327: Refactor to simplify TxAccept.
Removed un-necessary RangeOps layers. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1509419 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp64
-rw-r--r--cpp/src/qpid/broker/TxAccept.h82
2 files changed, 54 insertions, 92 deletions
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 928ac12c10..fc0c96f467 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,54 +28,23 @@ using namespace qpid::broker;
using qpid::framing::SequenceSet;
using qpid::framing::SequenceNumber;
-TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
-
-void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
-{
- for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
-}
-
-void TxAccept::RangeOp::commit()
-{
- for_each(range.start, range.end, bind(&DeliveryRecord::committed, _1));
- for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
-}
-
-TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {}
-void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
+ acked(_acked), unacked(_unacked)
{
- ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
+ for(SequenceSet::RangeIterator i = acked.rangesBegin(); i != acked.rangesEnd(); ++i)
+ ranges.push_back(DeliveryRecord::findRange(unacked, i->first(), i->last()));
}
-void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
-{
- std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
-}
-
-void TxAccept::RangeOps::commit()
-{
- std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
- //now remove if isRedundant():
- if (!ranges.empty()) {
- DeliveryRecords::iterator begin = ranges.front().range.start;
- DeliveryRecords::iterator end = ranges.back().range.end;
- DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
- unacked.erase(removed, end);
- }
-}
-
-TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
- acked(_acked), unacked(_unacked), ops(unacked)
-{
- //populate the ops
- acked.for_each(ops);
+void TxAccept::each(boost::function<void(DeliveryRecord&)> f) {
+ for(AckRanges::iterator i = ranges.begin(); i != ranges.end(); ++i)
+ for_each(i->start, i->end, f);
}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
{
try{
- ops.prepare(ctxt);
+ each(bind(&DeliveryRecord::dequeue, _1, ctxt));
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -86,10 +55,19 @@ bool TxAccept::prepare(TransactionContext* ctxt) throw()
}
}
-void TxAccept::commit() throw()
+void TxAccept::commit() throw()
{
try {
- ops.commit();
+ each(bind(&DeliveryRecord::committed, _1));
+ each(bind(&DeliveryRecord::setEnded, _1));
+ //now remove if isRedundant():
+ if (!ranges.empty()) {
+ DeliveryRecords::iterator begin = ranges.front().start;
+ DeliveryRecords::iterator end = ranges.back().end;
+ DeliveryRecords::iterator removed =
+ remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
+ unacked.erase(removed, end);
+ }
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to commit: " << e.what());
} catch(...) {
diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h
index daf192285a..3a8b663039 100644
--- a/cpp/src/qpid/broker/TxAccept.h
+++ b/cpp/src/qpid/broker/TxAccept.h
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,58 +21,42 @@
#ifndef _TxAccept_
#define _TxAccept_
-#include <algorithm>
-#include <functional>
-#include <list>
#include "qpid/framing/SequenceSet.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/TxOp.h"
+#include <boost/function.hpp>
+#include <algorithm>
+#include <functional>
+#include <list>
namespace qpid {
- namespace broker {
- /**
- * Defines the transactional behaviour for accepts received by
- * a transactional channel.
- */
- class TxAccept : public TxOp {
- struct RangeOp
- {
- AckRange range;
-
- RangeOp(const AckRange& r);
- void prepare(TransactionContext* ctxt);
- void commit();
- };
-
- struct RangeOps
- {
- std::vector<RangeOp> ranges;
- DeliveryRecords& unacked;
-
- RangeOps(DeliveryRecords& u);
-
- void operator()(framing::SequenceNumber start, framing::SequenceNumber end);
- void prepare(TransactionContext* ctxt);
- void commit();
- };
-
- framing::SequenceSet acked;
- DeliveryRecords& unacked;
- RangeOps ops;
-
- public:
- /**
- * @param acked a representation of the accumulation of
- * acks received
- * @param unacked the record of delivered messages
- */
- TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~TxAccept(){}
- };
- }
+namespace broker {
+/**
+ * Defines the transactional behaviour for accepts received by
+ * a transactional channel.
+ */
+class TxAccept : public TxOp {
+ typedef std::vector<AckRange> AckRanges;
+
+ void each(boost::function<void(DeliveryRecord&)>);
+
+ framing::SequenceSet acked;
+ DeliveryRecords& unacked;
+ AckRanges ranges;
+
+ public:
+ /**
+ * @param acked a representation of the accumulation of
+ * acks received
+ * @param unacked the record of delivered messages
+ */
+ TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAccept(){}
+};
+}
}