summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-06 17:58:50 +0000
committerAlan Conway <aconway@apache.org>2009-05-06 17:58:50 +0000
commitd37792b0cd112986236addbcde01ee55067c946b (patch)
treebe815e2111c6e7dfc629e2a8bd1742dceb9f6e81 /cpp/src
parentf4a17848cf8a9129466369eb153511dfd6879380 (diff)
downloadqpid-python-d37792b0cd112986236addbcde01ee55067c946b.tar.gz
DeliveryRecord optimizations.
Replace linear search with binary search. Collapse multi-pass mark-then-erase to a signle pass. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@772384 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp46
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h43
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp20
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp11
-rw-r--r--cpp/src/tests/DeliveryRecordTest.cpp2
6 files changed, 46 insertions, 78 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 930a65a522..4db32bd96d 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -48,29 +48,13 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
{}
-void DeliveryRecord::setEnded()
+bool DeliveryRecord::setEnded()
{
ended = true;
//reset msg pointer, don't need to hold on to it anymore
msg.payload = boost::intrusive_ptr<Message>();
-
QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
-}
-
-bool DeliveryRecord::matches(DeliveryId tag) const{
- return id == tag;
-}
-
-bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
- return matches(tag) || after(tag);
-}
-
-bool DeliveryRecord::after(DeliveryId tag) const{
- return id > tag;
-}
-
-bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
- return range->contains(id);
+ return isRedundant();
}
void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -120,17 +104,17 @@ void DeliveryRecord::release(bool setRedelivered)
}
}
-void DeliveryRecord::complete()
-{
+void DeliveryRecord::complete() {
completed = true;
}
-void DeliveryRecord::accept(TransactionContext* ctxt) {
+bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
+ return isRedundant();
}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
@@ -179,18 +163,10 @@ void DeliveryRecord::cancel(const std::string& cancelledTag)
AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last)
{
- DeliveryRecords::iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- DeliveryRecords::iterator end = start;
-
- if (start != records.end()) {
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last));
- }
- }
+ DeliveryRecords::iterator start = lower_bound(records.begin(), records.end(), first);
+ // Find end - position it just after the last record in range
+ DeliveryRecords::iterator end = lower_bound(records.begin(), records.end(), last);
+ if (end != records.end() && end->getId() == last) ++end;
return AckRange(start, end);
}
@@ -206,9 +182,5 @@ std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
return out;
}
-bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
-{
- return a.id < b.id;
-}
}}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index dc93542092..970d04ed49 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DELIVERYRECORD_H
+#define QPID_BROKER_DELIVERYRECORD_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,11 +21,9 @@
* under the License.
*
*/
-#ifndef _DeliveryRecord_
-#define _DeliveryRecord_
#include <algorithm>
-#include <list>
+#include <deque>
#include <vector>
#include <ostream>
#include "qpid/framing/SequenceSet.h"
@@ -44,15 +45,14 @@ class DeliveryRecord
{
QueuedMessage msg;
mutable Queue::shared_ptr queue;
- const std::string tag;
+ std::string tag;
DeliveryId id;
- bool acquired;
- bool acceptExpected;
- bool cancelled;
-
- bool completed;
- bool ended;
- const bool windowing;
+ bool acquired : 1;
+ bool acceptExpected : 1;
+ bool cancelled : 1;
+ bool completed : 1;
+ bool ended : 1;
+ bool windowing : 1;
/**
* Record required credit on construction as the pointer to the
@@ -61,7 +61,7 @@ class DeliveryRecord
* to reallocate credit when it is completed (which could happen
* after that).
*/
- const uint32_t credit;
+ uint32_t credit;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,10 +73,7 @@ class DeliveryRecord
uint32_t credit=0 // Only used if msg is empty.
);
- QPID_BROKER_EXTERN bool matches(DeliveryId tag) const;
- bool matchOrAfter(DeliveryId tag) const;
- bool after(DeliveryId tag) const;
- bool coveredBy(const framing::SequenceSet* const range) const;
+ bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
@@ -86,8 +83,8 @@ class DeliveryRecord
void redeliver(SemanticState* const);
void acquire(DeliveryIds& results);
void complete();
- void accept(TransactionContext* ctxt);
- void setEnded();
+ bool accept(TransactionContext* ctxt); // Returns isRedundant()
+ bool setEnded(); // Returns isRedundant()
void committed() const;
bool isAcquired() const { return acquired; }
@@ -104,15 +101,19 @@ class DeliveryRecord
void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
- typedef std::list<DeliveryRecord> DeliveryRecords;
+ typedef std::deque<DeliveryRecord> DeliveryRecords;
static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
const QueuedMessage& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
Queue::shared_ptr getQueue() const { return queue; }
- friend QPID_BROKER_EXTERN bool operator<(const DeliveryRecord&, const DeliveryRecord&);
+
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
+inline bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) { return a.getId() < b.getId(); }
+inline bool operator<(const framing::SequenceNumber& a, const DeliveryRecord& b) { return a < b.getId(); }
+inline bool operator<(const DeliveryRecord& a, const framing::SequenceNumber& b) { return a.getId() < b; }
+
struct AcquireFunctor
{
DeliveryIds& results;
@@ -138,4 +139,4 @@ struct AckRange
}
-#endif
+#endif /*!QPID_BROKER_DELIVERYRECORD_H*/
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5e41fa3302..3ba76f656e 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -408,12 +408,13 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
outputTasks.activateOutput();
}
-void SemanticState::complete(DeliveryRecord& delivery)
+bool SemanticState::complete(DeliveryRecord& delivery)
{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
}
+ return delivery.isRedundant();
}
void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
@@ -440,7 +441,7 @@ void SemanticState::recover(bool requeue)
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
- unacked.sort();
+ sort(unacked.begin(), unacked.end());
}
}
@@ -638,24 +639,23 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
- for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
-
//if the messages are already completed, they can be
//removed from the record
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
-
+ DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
+ unacked.erase(removed, range.end);
}
} else {
- for_each(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+ DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
+ unacked.erase(removed, range.end);
}
}
void SemanticState::completed(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+
+ DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+ unacked.erase(removed, range.end);
requestDispatch();
}
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 9dd7cc914b..35f8b4392f 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -151,7 +151,7 @@ class SemanticState : public sys::OutputTask,
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
- void complete(DeliveryRecord&);
+ bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index ca128b1975..15a1222a74 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -58,15 +58,10 @@ void TxAccept::RangeOps::commit()
std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
//now remove if isRedundant():
if (!ranges.empty()) {
- DeliveryRecords::iterator i = ranges.front().range.start;
+ DeliveryRecords::iterator begin = ranges.front().range.start;
DeliveryRecords::iterator end = ranges.back().range.end;
- while (i != end) {
- if (i->isRedundant()) {
- i = unacked.erase(i);
- } else {
- i++;
- }
- }
+ DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
+ unacked.erase(removed, end);
}
}
diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp
index 47c7157749..8ff7ad3584 100644
--- a/cpp/src/tests/DeliveryRecordTest.cpp
+++ b/cpp/src/tests/DeliveryRecordTest.cpp
@@ -53,7 +53,7 @@ QPID_AUTO_TEST_CASE(testSort)
SequenceNumber expected(0);
for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) {
- BOOST_CHECK(i->matches(++expected));
+ BOOST_CHECK(i->getId() == ++expected);
}
}