diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 42 |
1 files changed, 10 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index d2accddcd0..bfb20118b5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,23 +30,12 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) +void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) { - SequenceSet accepting; - if (cumulative) { - for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { - accepting.add(*i); - } - unconfirmed.add(accepting); - unaccepted.remove(accepting); - } else { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); - accepting.add(id); - } + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); } - return accepting; } void AcceptTracker::State::release() @@ -70,18 +59,6 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin destinationState[destination].unaccepted.add(id); } -namespace -{ -const size_t FLUSH_FREQUENCY = 1024; -} - -void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) -{ - pending.push_back(record); - if (pending.size() % FLUSH_FREQUENCY == 0) session.flush(); -} - - void AcceptTracker::accept(qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { @@ -90,19 +67,20 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) Record record; record.status = session.messageAccept(aggregateState.unaccepted); record.accepted = aggregateState.unaccepted; - addToPending(session, record); + pending.push_back(record); aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id, cumulative); + i->second.accept(id); } Record record; - record.accepted = aggregateState.accept(id, cumulative); + record.accepted.add(id); record.status = session.messageAccept(record.accepted); - addToPending(session, record); + pending.push_back(record); + aggregateState.accept(id); } void AcceptTracker::release(qpid::client::AsyncSession& session) |