diff options
author | Gordon Sim <gsim@apache.org> | 2011-06-15 11:48:43 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-06-15 11:48:43 +0000 |
commit | 3492b186c33b875dc2f0fb18fc32ab4dadd77a3d (patch) | |
tree | 50b00dded177e6390f8ebef903f35fd863229a9c /cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | |
parent | ffe1a98a4749d84060355bacf881ac1d2ba1324c (diff) | |
download | qpid-python-3492b186c33b875dc2f0fb18fc32ab4dadd77a3d.tar.gz |
QPID-3200: Add new method to session for cumulative acknowledgement upto (and including) a specified message
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1136003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index bfb20118b5..e8d250de0f 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,12 +30,23 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) +SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); + SequenceSet accepting; + if (cumulative) { + for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { + accepting.add(*i); + } + unconfirmed.add(accepting); + unaccepted.remove(accepting); + } else { + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); + accepting.add(id); + } } + return accepting; } void AcceptTracker::State::release() @@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id); + i->second.accept(id, cumulative); } Record record; - record.accepted.add(id); + record.accepted = aggregateState.accept(id, cumulative); record.status = session.messageAccept(record.accepted); pending.push_back(record); - aggregateState.accept(id); } void AcceptTracker::release(qpid::client::AsyncSession& session) |