summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp42
1 files changed, 10 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
index d2accddcd0..bfb20118b5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -30,23 +30,12 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
+void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
{
- SequenceSet accepting;
- if (cumulative) {
- for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
- accepting.add(*i);
- }
- unconfirmed.add(accepting);
- unaccepted.remove(accepting);
- } else {
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
- accepting.add(id);
- }
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
}
- return accepting;
}
void AcceptTracker::State::release()
@@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin
destinationState[destination].unaccepted.add(id);
}
-namespace
-{
-const size_t FLUSH_FREQUENCY = 1024;
-}
-
-void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record)
-{
- pending.push_back(record);
- if (pending.size() % FLUSH_FREQUENCY == 0) session.flush();
-}
-
-
void AcceptTracker::accept(qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
@@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session)
Record record;
record.status = session.messageAccept(aggregateState.unaccepted);
record.accepted = aggregateState.unaccepted;
- addToPending(session, record);
+ pending.push_back(record);
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id, cumulative);
+ i->second.accept(id);
}
Record record;
- record.accepted = aggregateState.accept(id, cumulative);
+ record.accepted.add(id);
record.status = session.messageAccept(record.accepted);
- addToPending(session, record);
+ pending.push_back(record);
+ aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)