diff options
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.cpp | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp index 3363dda11f..684cca031a 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -27,6 +27,7 @@ namespace qpid { namespace client { using sys::Mutex; +using framing::MessageAcquireResult; SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l) : manager(m), name(n), queue(q), settings(s), listener(l) @@ -68,16 +69,19 @@ SequenceSet SubscriptionImpl::getUnaccepted() const { Mutex::ScopedLock l(lock); void SubscriptionImpl::acquire(const SequenceSet& messageIds) { Mutex::ScopedLock l(lock); - manager.getSession().messageAcquire(messageIds); - unacquired.remove(messageIds); + MessageAcquireResult result = manager.getSession().messageAcquire(messageIds); + unacquired.remove(result.getTransfers()); if (settings.acceptMode == ACCEPT_MODE_EXPLICIT) - unaccepted.add(messageIds); + unaccepted.add(result.getTransfers()); } void SubscriptionImpl::accept(const SequenceSet& messageIds) { Mutex::ScopedLock l(lock); manager.getSession().messageAccept(messageIds); unaccepted.remove(messageIds); + if (settings.autoComplete) { + manager.getSession().sendCompletion(); + } } Session SubscriptionImpl::getSession() const { return manager.getSession(); } @@ -88,7 +92,6 @@ void SubscriptionImpl::cancel() { manager.cancel(name); } void SubscriptionImpl::received(Message& m) { Mutex::ScopedLock l(lock); - manager.getSession().markCompleted(m.getId(), false, false); if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED) unacquired.add(m.getId()); else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT) @@ -99,15 +102,16 @@ void SubscriptionImpl::received(Message& m) { listener->received(m); } + if (settings.autoComplete) { + manager.getSession().markCompleted(m.getId(), false, false); + } if (settings.autoAck) { - if (unacquired.size() + unaccepted.size() >= settings.autoAck) { - if (unacquired.size()) { - async(manager.getSession()).messageAcquire(unacquired); - unaccepted.add(unacquired); - unaccepted.clear(); - } + if (unaccepted.size() >= settings.autoAck) { async(manager.getSession()).messageAccept(unaccepted); unaccepted.clear(); + if (settings.autoComplete) { + manager.getSession().sendCompletion(); + } } } } |