From ce747a29ec572883cee28590caa1f61a35c9de03 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 24 Jun 2011 14:42:23 +0000 Subject: QPID-3321: flush occasionally to keep the list of pending accepts from growing too large git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1139334 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 16 ++++++++++++++-- qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h | 1 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index e8d250de0f..e2a87913e8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -70,6 +70,18 @@ void AcceptTracker::delivered(const std::string& destination, const qpid::framin destinationState[destination].unaccepted.add(id); } +namespace +{ +const size_t FLUSH_FREQUENCY = 1024; +} + +void AcceptTracker::addToPending(qpid::client::AsyncSession& session, const Record& record) +{ + pending.push_back(record); + if (pending.size() > FLUSH_FREQUENCY) session.flush(); +} + + void AcceptTracker::accept(qpid::client::AsyncSession& session) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { @@ -78,7 +90,7 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) Record record; record.status = session.messageAccept(aggregateState.unaccepted); record.accepted = aggregateState.unaccepted; - pending.push_back(record); + addToPending(session, record); aggregateState.accept(); } @@ -90,7 +102,7 @@ void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::Async Record record; record.accepted = aggregateState.accept(id, cumulative); record.status = session.messageAccept(record.accepted); - pending.push_back(record); + addToPending(session, record); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 9e801e8147..85209c3b87 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -79,6 +79,7 @@ class AcceptTracker StateMap destinationState; Records pending; + void addToPending(qpid::client::AsyncSession&, const Record&); void checkPending(); void completed(qpid::framing::SequenceSet&); }; -- cgit v1.2.1