diff options
author | Gordon Sim <gsim@apache.org> | 2011-06-15 11:48:43 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-06-15 11:48:43 +0000 |
commit | 02a6c2f7ab64332ea13dff4605a8478b4ec9bd37 (patch) | |
tree | 8daf10524a6b7f20dd342df4ced6fc458d608817 | |
parent | ffffe1b5cd71d2f12a26124a903811a1e3da3c12 (diff) | |
download | qpid-python-02a6c2f7ab64332ea13dff4605a8478b4ec9bd37.tar.gz |
QPID-3200: Add new method to session for cumulative acknowledgement upto (and including) a specified message
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1136003 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/include/qpid/messaging/Session.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/Session.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/SessionImpl.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 40 |
10 files changed, 81 insertions, 24 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Session.h b/qpid/cpp/include/qpid/messaging/Session.h index 52786eb5f4..428f8aa491 100644 --- a/qpid/cpp/include/qpid/messaging/Session.h +++ b/qpid/cpp/include/qpid/messaging/Session.h @@ -78,6 +78,10 @@ class QPID_MESSAGING_CLASS_EXTERN Session : public qpid::messaging::Handle<Sessi */ QPID_MESSAGING_EXTERN void acknowledge(Message&, bool sync=false); /** + * Acknowledges all message up to the specified message. + */ + QPID_MESSAGING_EXTERN void acknowledgeUpTo(Message&, bool sync=false); + /** * Rejects the specified message. The broker does not redeliver a * message that has been rejected. Once a message has been * acknowledged, it can no longer be rejected. diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index bfb20118b5..e8d250de0f 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,12 +30,23 @@ void AcceptTracker::State::accept() unaccepted.clear(); } -void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) +SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative) { - if (unaccepted.contains(id)) { - unaccepted.remove(id); - unconfirmed.add(id); + SequenceSet accepting; + if (cumulative) { + for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) { + accepting.add(*i); + } + unconfirmed.add(accepting); + unaccepted.remove(accepting); + } else { + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); + accepting.add(id); + } } + return accepting; } void AcceptTracker::State::release() @@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) aggregateState.accept(); } -void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative) { for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { - i->second.accept(id); + i->second.accept(id, cumulative); } Record record; - record.accepted.add(id); + record.accepted = aggregateState.accept(id, cumulative); record.status = session.messageAccept(record.accepted); pending.push_back(record); - aggregateState.accept(id); } void AcceptTracker::release(qpid::client::AsyncSession& session) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index 87890e41cc..9e801e8147 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -42,7 +42,7 @@ class AcceptTracker public: void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); void accept(qpid::client::AsyncSession&); - void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative); void release(qpid::client::AsyncSession&); uint32_t acceptsPending(); uint32_t acceptsPending(const std::string& destination); @@ -62,7 +62,7 @@ class AcceptTracker qpid::framing::SequenceSet unconfirmed; void accept(); - void accept(qpid::framing::SequenceNumber); + qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative); void release(); uint32_t acceptsPending(); void completed(qpid::framing::SequenceSet&); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 71e89bdba1..5cf20c92eb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -144,10 +144,10 @@ void IncomingMessages::accept() acceptTracker.accept(session); } -void IncomingMessages::accept(qpid::framing::SequenceNumber id) +void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) { sys::Mutex::ScopedLock l(lock); - acceptTracker.accept(id, session); + acceptTracker.accept(id, session, cumulative); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index f6a291bc68..9053b70312 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -72,7 +72,7 @@ class IncomingMessages bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); - void accept(qpid::framing::SequenceNumber id); + void accept(qpid::framing::SequenceNumber id, bool cumulative); void releaseAll(); void releasePending(const std::string& destination); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 9e43e70f32..787be7de2a 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -112,13 +112,14 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } -void SessionImpl::acknowledge(qpid::messaging::Message& m) +void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative) { //Should probably throw an exception on failure here, or indicate //it through a return type at least. Failure means that the //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message - execute1<Acknowledge1>(m); + Acknowledge2 ack(*this, m, cumulative); + execute(ack); } void SessionImpl::close() @@ -467,10 +468,10 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } -void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative) { ScopedLock l(lock); - if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative); } void SessionImpl::rejectImpl(qpid::messaging::Message& m) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 2a2aa47df6..c7dea77d18 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void acknowledge(bool sync); void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); - void acknowledge(qpid::messaging::Message& msg); + void acknowledge(qpid::messaging::Message& msg, bool cumulative); void close(); void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); @@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void commitImpl(); void rollbackImpl(); void acknowledgeImpl(); - void acknowledgeImpl(qpid::messaging::Message&); + void acknowledgeImpl(qpid::messaging::Message&, bool cumulative); void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); @@ -204,12 +204,13 @@ class SessionImpl : public qpid::messaging::SessionImpl void operator()() { impl.releaseImpl(message); } }; - struct Acknowledge1 : Command + struct Acknowledge2 : Command { qpid::messaging::Message& message; + bool cumulative; - Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} - void operator()() { impl.acknowledgeImpl(message); } + Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {} + void operator()() { impl.acknowledgeImpl(message, cumulative); } }; struct CreateSender; diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp index 496953a8e5..cccfd9a873 100644 --- a/qpid/cpp/src/qpid/messaging/Session.cpp +++ b/qpid/cpp/src/qpid/messaging/Session.cpp @@ -39,7 +39,8 @@ Session& Session::operator=(const Session& s) { return PI::assign(*this, s); } void Session::commit() { impl->commit(); } void Session::rollback() { impl->rollback(); } void Session::acknowledge(bool sync) { impl->acknowledge(sync); } -void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); } +void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); } +void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); } void Session::reject(Message& m) { impl->reject(m); } void Session::release(Message& m) { impl->release(m); } void Session::close() { impl->close(); } diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h index 02a254e4f2..60ae615253 100644 --- a/qpid/cpp/src/qpid/messaging/SessionImpl.h +++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h @@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid::RefCounted virtual void commit() = 0; virtual void rollback() = 0; virtual void acknowledge(bool sync) = 0; - virtual void acknowledge(Message&) = 0; + virtual void acknowledge(Message&, bool cumulative) = 0; virtual void reject(Message&) = 0; virtual void release(Message&) = 0; virtual void close() = 0; diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index a93d8f78ae..fae45a94d0 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1024,6 +1024,46 @@ QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + const uint count(20); + for (uint i = 0; i < count; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Session other = fix.connection.createSession(); + Receiver receiver = other.createReceiver(fix.queue); + std::vector<Message> messages; + for (uint i = 0; i < count; ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + messages.push_back(msg); + } + const uint batch = 10; + other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only + + messages.clear(); + other.sync(); + other.close(); + + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + Message msg; + for (uint i = 0; i < (count-batch); ++i) { + msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); + } + other.acknowledgeUpTo(msg); + other.sync(); + other.close(); + + Message m; + //check queue is empty + BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |