summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-08-12 18:29:03 +0000
committerAlan Conway <aconway@apache.org>2009-08-12 18:29:03 +0000
commit98ce81ce2db861c73dedfab85f4c5bb8832efc20 (patch)
treef3f81947732e7a1641f5a91ef0fe2b05a566adb5 /qpid
parentb567927bda5350a2e85b2087fbc43b098df25431 (diff)
downloadqpid-python-98ce81ce2db861c73dedfab85f4c5bb8832efc20.tar.gz
Optimized handling of accepts and completions.
SemanticState::accept/completed now make a single pass through the SequenceSet of commands and the unacked DeliveryRecord list in parallel, rather than doing a pass through unacked for every range in the SequenceSet. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@803655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp54
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h5
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp3
-rw-r--r--qpid/cpp/src/qpid/framing/IsInSequenceSet.h51
5 files changed, 95 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 37a9e9b4af..bdd5f33601 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -49,8 +51,9 @@
namespace qpid {
namespace broker {
-using std::mem_fun_ref;
+using namespace std;
using boost::intrusive_ptr;
+using boost::bind;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -631,13 +634,27 @@ void SemanticState::ConsumerImpl::notify()
}
-void SemanticState::accepted(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
+// Test that a DeliveryRecord's ID is in a sequence set and some other
+// predicate on DeliveryRecord holds.
+template <class Predicate> struct IsInSequenceSetAnd {
+ IsInSequenceSet isInSet;
+ Predicate predicate;
+ IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {}
+ bool operator()(DeliveryRecord& dr) {
+ return isInSet(dr.getId()) && predicate(dr);
+ }
+};
+
+template<class Predicate> IsInSequenceSetAnd<Predicate>
+isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
+ return IsInSequenceSetAnd<Predicate>(s,p);
+}
+
+void SemanticState::accepted(const SequenceSet& commands) {
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.add(first, last);
+ accumulatedAck.add(commands);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
@@ -649,21 +666,28 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be
//removed from the record
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::setEnded, _1)));
+ unacked.erase(removed, unacked.end());
}
} else {
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
- unacked.erase(removed, range.end);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&DeliveryRecord::accept, _1,
+ (TransactionContext*) 0)));
+ unacked.erase(removed, unacked.end());
}
}
-void SemanticState::completed(DeliveryId first, DeliveryId last)
-{
- AckRange range = findRange(first, last);
-
- DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
- unacked.erase(removed, range.end);
+void SemanticState::completed(const SequenceSet& commands) {
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(&SemanticState::complete, this, _1)));
+ unacked.erase(removed, unacked.end());
requestDispatch();
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 4b87c59c68..da8383fc12 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -206,9 +206,8 @@ class SemanticState : private boost::noncopyable {
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
- //final 0-10 spec (completed and accepted are distinct):
- void completed(DeliveryId deliveryTag, DeliveryId endTag);
- void accepted(DeliveryId deliveryTag, DeliveryId endTag);
+ void completed(const framing::SequenceSet& commands);
+ void accepted(const framing::SequenceSet& commands);
void attached();
void detached();
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 8eed673add..b0c5e9ea00 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -429,13 +429,11 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
}
}
-
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
- acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
//
@@ -547,8 +545,7 @@ void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
{
-
- commands.for_each(acceptOp);
+ state.accepted(commands);
}
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index d4e5cfaa67..4c5aaf7fc4 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -357,8 +357,7 @@ void SessionState::sendCompletion() {
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
- for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
- semanticState.completed(i->first(), i->last());
+ semanticState.completed(commands);
}
void SessionState::readyToSend() {
diff --git a/qpid/cpp/src/qpid/framing/IsInSequenceSet.h b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h
new file mode 100644
index 0000000000..fe10c1b9fa
--- /dev/null
+++ b/qpid/cpp/src/qpid/framing/IsInSequenceSet.h
@@ -0,0 +1,51 @@
+#ifndef QPID_FRAMING_ISINSEQUENCESET_H
+#define QPID_FRAMING_ISINSEQUENCESET_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceSet.h"
+
+namespace qpid {
+namespace framing {
+/**
+ * Functor to test whether values are in a sequence set. This is a
+ * stateful functor that requires the values to be supplied in order
+ * and takes advantage of that ordering to avoid multiple scans.
+ */
+class IsInSequenceSet
+{
+ public:
+ IsInSequenceSet(const SequenceSet& s) : set(s), i(set.rangesBegin()) {}
+
+ bool operator()(const SequenceNumber& n) {
+ while (i != set.rangesEnd() && i->end() <= n) ++i;
+ return i != set.rangesEnd() && i->begin() <= n;
+ }
+
+ private:
+ const SequenceSet& set;
+ SequenceSet::RangeIterator i;
+};
+
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_ISINSEQUENCESET_H*/