diff options
author | Alan Conway <aconway@apache.org> | 2009-08-12 18:29:03 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-08-12 18:29:03 +0000 |
commit | 98ce81ce2db861c73dedfab85f4c5bb8832efc20 (patch) | |
tree | f3f81947732e7a1641f5a91ef0fe2b05a566adb5 /qpid | |
parent | b567927bda5350a2e85b2087fbc43b098df25431 (diff) | |
download | qpid-python-98ce81ce2db861c73dedfab85f4c5bb8832efc20.tar.gz |
Optimized handling of accepts and completions.
SemanticState::accept/completed now make a single pass through the
SequenceSet of commands and the unacked DeliveryRecord list in
parallel, rather than doing a pass through unacked for every range in
the SequenceSet.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 54 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/IsInSequenceSet.h | 51 |
5 files changed, 95 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 37a9e9b4af..bdd5f33601 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/framing/IsInSequenceSet.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" @@ -49,8 +51,9 @@ namespace qpid { namespace broker { -using std::mem_fun_ref; +using namespace std; using boost::intrusive_ptr; +using boost::bind; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; @@ -631,13 +634,27 @@ void SemanticState::ConsumerImpl::notify() } -void SemanticState::accepted(DeliveryId first, DeliveryId last) -{ - AckRange range = findRange(first, last); +// Test that a DeliveryRecord's ID is in a sequence set and some other +// predicate on DeliveryRecord holds. +template <class Predicate> struct IsInSequenceSetAnd { + IsInSequenceSet isInSet; + Predicate predicate; + IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {} + bool operator()(DeliveryRecord& dr) { + return isInSet(dr.getId()) && predicate(dr); + } +}; + +template<class Predicate> IsInSequenceSetAnd<Predicate> +isInSequenceSetAnd(const SequenceSet& s, Predicate p) { + return IsInSequenceSetAnd<Predicate>(s,p); +} + +void SemanticState::accepted(const SequenceSet& commands) { if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: - accumulatedAck.add(first, last); + accumulatedAck.add(commands); if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from @@ -649,21 +666,28 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be //removed from the record - DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded)); - unacked.erase(removed, range.end); + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&DeliveryRecord::setEnded, _1))); + unacked.erase(removed, unacked.end()); } } else { - DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0)); - unacked.erase(removed, range.end); + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&DeliveryRecord::accept, _1, + (TransactionContext*) 0))); + unacked.erase(removed, unacked.end()); } } -void SemanticState::completed(DeliveryId first, DeliveryId last) -{ - AckRange range = findRange(first, last); - - DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); - unacked.erase(removed, range.end); +void SemanticState::completed(const SequenceSet& commands) { + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(&SemanticState::complete, this, _1))); + unacked.erase(removed, unacked.end()); requestDispatch(); } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 4b87c59c68..da8383fc12 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -206,9 +206,8 @@ class SemanticState : private boost::noncopyable { void reject(DeliveryId first, DeliveryId last); void handle(boost::intrusive_ptr<Message> msg); - //final 0-10 spec (completed and accepted are distinct): - void completed(DeliveryId deliveryTag, DeliveryId endTag); - void accepted(DeliveryId deliveryTag, DeliveryId endTag); + void completed(const framing::SequenceSet& commands); + void accepted(const framing::SequenceSet& commands); void attached(); void detached(); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 8eed673add..b0c5e9ea00 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -429,13 +429,11 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse } } - SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerHelper(s), releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), - rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), - acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) + rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) {} // @@ -547,8 +545,7 @@ void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination) void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands) { - - commands.for_each(acceptOp); + state.accepted(commands); } framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d4e5cfaa67..4c5aaf7fc4 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -357,8 +357,7 @@ void SessionState::sendCompletion() { void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); - for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++) - semanticState.completed(i->first(), i->last()); + semanticState.completed(commands); } void SessionState::readyToSend() { diff --git a/qpid/cpp/src/qpid/framing/IsInSequenceSet.h b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h new file mode 100644 index 0000000000..fe10c1b9fa --- /dev/null +++ b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h @@ -0,0 +1,51 @@ +#ifndef QPID_FRAMING_ISINSEQUENCESET_H +#define QPID_FRAMING_ISINSEQUENCESET_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" + +namespace qpid { +namespace framing { +/** + * Functor to test whether values are in a sequence set. This is a + * stateful functor that requires the values to be supplied in order + * and takes advantage of that ordering to avoid multiple scans. + */ +class IsInSequenceSet +{ + public: + IsInSequenceSet(const SequenceSet& s) : set(s), i(set.rangesBegin()) {} + + bool operator()(const SequenceNumber& n) { + while (i != set.rangesEnd() && i->end() <= n) ++i; + return i != set.rangesEnd() && i->begin() <= n; + } + + private: + const SequenceSet& set; + SequenceSet::RangeIterator i; +}; + +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_ISINSEQUENCESET_H*/ |