summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp
index 3363dda11f..684cca031a 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -27,6 +27,7 @@ namespace qpid {
namespace client {
using sys::Mutex;
+using framing::MessageAcquireResult;
SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
: manager(m), name(n), queue(q), settings(s), listener(l)
@@ -68,16 +69,19 @@ SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock);
void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
- manager.getSession().messageAcquire(messageIds);
- unacquired.remove(messageIds);
+ MessageAcquireResult result = manager.getSession().messageAcquire(messageIds);
+ unacquired.remove(result.getTransfers());
if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
- unaccepted.add(messageIds);
+ unaccepted.add(result.getTransfers());
}
void SubscriptionImpl::accept(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
manager.getSession().messageAccept(messageIds);
unaccepted.remove(messageIds);
+ if (settings.autoComplete) {
+ manager.getSession().sendCompletion();
+ }
}
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
@@ -88,7 +92,6 @@ void SubscriptionImpl::cancel() { manager.cancel(name); }
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
- manager.getSession().markCompleted(m.getId(), false, false);
if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
@@ -99,15 +102,16 @@ void SubscriptionImpl::received(Message& m) {
listener->received(m);
}
+ if (settings.autoComplete) {
+ manager.getSession().markCompleted(m.getId(), false, false);
+ }
if (settings.autoAck) {
- if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
- if (unacquired.size()) {
- async(manager.getSession()).messageAcquire(unacquired);
- unaccepted.add(unacquired);
- unaccepted.clear();
- }
+ if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
unaccepted.clear();
+ if (settings.autoComplete) {
+ manager.getSession().sendCompletion();
+ }
}
}
}