diff options
author | Alan Conway <aconway@apache.org> | 2009-05-06 17:58:50 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-06 17:58:50 +0000 |
commit | d37792b0cd112986236addbcde01ee55067c946b (patch) | |
tree | be815e2111c6e7dfc629e2a8bd1742dceb9f6e81 /cpp/src | |
parent | f4a17848cf8a9129466369eb153511dfd6879380 (diff) | |
download | qpid-python-d37792b0cd112986236addbcde01ee55067c946b.tar.gz |
DeliveryRecord optimizations.
Replace linear search with binary search.
Collapse multi-pass mark-then-erase to a signle pass.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@772384 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 43 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 11 | ||||
-rw-r--r-- | cpp/src/tests/DeliveryRecordTest.cpp | 2 |
6 files changed, 46 insertions, 78 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 930a65a522..4db32bd96d 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -48,29 +48,13 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, credit(msg.payload ? msg.payload->getRequiredCredit() : _credit) {} -void DeliveryRecord::setEnded() +bool DeliveryRecord::setEnded() { ended = true; //reset msg pointer, don't need to hold on to it anymore msg.payload = boost::intrusive_ptr<Message>(); - QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id); -} - -bool DeliveryRecord::matches(DeliveryId tag) const{ - return id == tag; -} - -bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{ - return matches(tag) || after(tag); -} - -bool DeliveryRecord::after(DeliveryId tag) const{ - return id > tag; -} - -bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{ - return range->contains(id); + return isRedundant(); } void DeliveryRecord::redeliver(SemanticState* const session) { @@ -120,17 +104,17 @@ void DeliveryRecord::release(bool setRedelivered) } } -void DeliveryRecord::complete() -{ +void DeliveryRecord::complete() { completed = true; } -void DeliveryRecord::accept(TransactionContext* ctxt) { +bool DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { queue->dequeue(ctxt, msg); setEnded(); QPID_LOG(debug, "Accepted " << id); } + return isRedundant(); } void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ @@ -179,18 +163,10 @@ void DeliveryRecord::cancel(const std::string& cancelledTag) AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last) { - DeliveryRecords::iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); - DeliveryRecords::iterator end = start; - - if (start != records.end()) { - if (first == last) { - //just acked single element (move end past it) - ++end; - } else { - //need to find end (position it just after the last record in range) - end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last)); - } - } + DeliveryRecords::iterator start = lower_bound(records.begin(), records.end(), first); + // Find end - position it just after the last record in range + DeliveryRecords::iterator end = lower_bound(records.begin(), records.end(), last); + if (end != records.end() && end->getId() == last) ++end; return AckRange(start, end); } @@ -206,9 +182,5 @@ std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) return out; } -bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) -{ - return a.id < b.id; -} }} diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index dc93542092..970d04ed49 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -1,3 +1,6 @@ +#ifndef QPID_BROKER_DELIVERYRECORD_H +#define QPID_BROKER_DELIVERYRECORD_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,11 +21,9 @@ * under the License. * */ -#ifndef _DeliveryRecord_ -#define _DeliveryRecord_ #include <algorithm> -#include <list> +#include <deque> #include <vector> #include <ostream> #include "qpid/framing/SequenceSet.h" @@ -44,15 +45,14 @@ class DeliveryRecord { QueuedMessage msg; mutable Queue::shared_ptr queue; - const std::string tag; + std::string tag; DeliveryId id; - bool acquired; - bool acceptExpected; - bool cancelled; - - bool completed; - bool ended; - const bool windowing; + bool acquired : 1; + bool acceptExpected : 1; + bool cancelled : 1; + bool completed : 1; + bool ended : 1; + bool windowing : 1; /** * Record required credit on construction as the pointer to the @@ -61,7 +61,7 @@ class DeliveryRecord * to reallocate credit when it is completed (which could happen * after that). */ - const uint32_t credit; + uint32_t credit; public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, @@ -73,10 +73,7 @@ class DeliveryRecord uint32_t credit=0 // Only used if msg is empty. ); - QPID_BROKER_EXTERN bool matches(DeliveryId tag) const; - bool matchOrAfter(DeliveryId tag) const; - bool after(DeliveryId tag) const; - bool coveredBy(const framing::SequenceSet* const range) const; + bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); } void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; @@ -86,8 +83,8 @@ class DeliveryRecord void redeliver(SemanticState* const); void acquire(DeliveryIds& results); void complete(); - void accept(TransactionContext* ctxt); - void setEnded(); + bool accept(TransactionContext* ctxt); // Returns isRedundant() + bool setEnded(); // Returns isRedundant() void committed() const; bool isAcquired() const { return acquired; } @@ -104,15 +101,19 @@ class DeliveryRecord void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize); void setId(DeliveryId _id) { id = _id; } - typedef std::list<DeliveryRecord> DeliveryRecords; + typedef std::deque<DeliveryRecord> DeliveryRecords; static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last); const QueuedMessage& getMessage() const { return msg; } framing::SequenceNumber getId() const { return id; } Queue::shared_ptr getQueue() const { return queue; } - friend QPID_BROKER_EXTERN bool operator<(const DeliveryRecord&, const DeliveryRecord&); + friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; +inline bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) { return a.getId() < b.getId(); } +inline bool operator<(const framing::SequenceNumber& a, const DeliveryRecord& b) { return a < b.getId(); } +inline bool operator<(const DeliveryRecord& a, const framing::SequenceNumber& b) { return a.getId() < b; } + struct AcquireFunctor { DeliveryIds& results; @@ -138,4 +139,4 @@ struct AckRange } -#endif +#endif /*!QPID_BROKER_DELIVERYRECORD_H*/ diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5e41fa3302..3ba76f656e 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -408,12 +408,13 @@ void SemanticState::requestDispatch(ConsumerImpl& c) outputTasks.activateOutput(); } -void SemanticState::complete(DeliveryRecord& delivery) +bool SemanticState::complete(DeliveryRecord& delivery) { ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); } + return delivery.isRedundant(); } void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) @@ -440,7 +441,7 @@ void SemanticState::recover(bool requeue) //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost - unacked.sort(); + sort(unacked.begin(), unacked.end()); } } @@ -638,24 +639,23 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked - for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); - //if the messages are already completed, they can be //removed from the record - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); - + DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); + unacked.erase(removed, range.end); } } else { - for_each(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0)); - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0)); + unacked.erase(removed, range.end); } } void SemanticState::completed(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); - for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); - unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); + + DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); + unacked.erase(removed, range.end); requestDispatch(); } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 9dd7cc914b..35f8b4392f 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -151,7 +151,7 @@ class SemanticState : public sys::OutputTask, void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); void checkDtxTimeout(); - void complete(DeliveryRecord&); + bool complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index ca128b1975..15a1222a74 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -58,15 +58,10 @@ void TxAccept::RangeOps::commit() std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1)); //now remove if isRedundant(): if (!ranges.empty()) { - DeliveryRecords::iterator i = ranges.front().range.start; + DeliveryRecords::iterator begin = ranges.front().range.start; DeliveryRecords::iterator end = ranges.back().range.end; - while (i != end) { - if (i->isRedundant()) { - i = unacked.erase(i); - } else { - i++; - } - } + DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant)); + unacked.erase(removed, end); } } diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp index 47c7157749..8ff7ad3584 100644 --- a/cpp/src/tests/DeliveryRecordTest.cpp +++ b/cpp/src/tests/DeliveryRecordTest.cpp @@ -53,7 +53,7 @@ QPID_AUTO_TEST_CASE(testSort) SequenceNumber expected(0); for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) { - BOOST_CHECK(i->matches(++expected)); + BOOST_CHECK(i->getId() == ++expected); } } |