diff options
author | Gordon Sim <gsim@apache.org> | 2011-06-24 14:42:23 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-06-24 14:42:23 +0000 |
commit | ce747a29ec572883cee28590caa1f61a35c9de03 (patch) | |
tree | a4d77128985c0df02ef734fbdc703262b620e2fb /qpid/cpp/src | |
parent | f67217f074432daa52ba3e16966508313b9d6828 (diff) | |
download | qpid-python-ce747a29ec572883cee28590caa1f61a35c9de03.tar.gz |
QPID-3321: flush occasionally to keep the list of pending accepts from growing too large
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1139334 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h | 1 |
2 files changed, 15 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index e8d250de0f..e2a87913e8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -70,6 +70,18 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin destinationState[destination].unaccepted.add(id); } +namespace +{ +const size_t FLUSH_FREQUENCY = 1024; +} + +void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) +{ + pending.push_back(record); + if (pending.size() > FLUSH_FREQUENCY) session.flush(); +} + + void AcceptTracker::accept(qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { @@ -78,7 +90,7 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) Record record; record.status = session.messageAccept(aggregateState.unaccepted); record.accepted = aggregateState.unaccepted; - pending.push_back(record); + addToPending(session, record); aggregateState.accept(); } @@ -90,7 +102,7 @@ void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::Async Record record; record.accepted = aggregateState.accept(id, cumulative); record.status = session.messageAccept(record.accepted); - pending.push_back(record); + addToPending(session, record); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 9e801e8147..85209c3b87 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -79,6 +79,7 @@ class AcceptTracker StateMap destinationState; Records pending; + void addToPending(qpid::client::AsyncSession&, const Record&); void checkPending(); void completed(qpid::framing::SequenceSet&); }; |