summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-06-15 11:48:43 +0000
committerGordon Sim <gsim@apache.org>2011-06-15 11:48:43 +0000
commit3492b186c33b875dc2f0fb18fc32ab4dadd77a3d (patch)
tree50b00dded177e6390f8ebef903f35fd863229a9c /cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
parentffe1a98a4749d84060355bacf881ac1d2ba1324c (diff)
downloadqpid-python-3492b186c33b875dc2f0fb18fc32ab4dadd77a3d.tar.gz
QPID-3200: Add new method to session for cumulative acknowledgement upto (and including) a specified message
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1136003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp26
1 files changed, 18 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
index bfb20118b5..e8d250de0f 100644
--- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -30,12 +30,23 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
+SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
+ SequenceSet accepting;
+ if (cumulative) {
+ for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
+ accepting.add(*i);
+ }
+ unconfirmed.add(accepting);
+ unaccepted.remove(accepting);
+ } else {
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
+ accepting.add(id);
+ }
}
+ return accepting;
}
void AcceptTracker::State::release()
@@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session)
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id);
+ i->second.accept(id, cumulative);
}
Record record;
- record.accepted.add(id);
+ record.accepted = aggregateState.accept(id, cumulative);
record.status = session.messageAccept(record.accepted);
pending.push_back(record);
- aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)