diff options
author | Gordon Sim <gsim@apache.org> | 2010-08-20 11:32:11 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-08-20 11:32:11 +0000 |
commit | 145087188c7bd24175cbddbe1156433a4b498abc (patch) | |
tree | 8d0ff348fe1f068308b6516f6f31874d3d4f3fda /cpp | |
parent | 91a4eda9bfa588f1d017c218ac2bcc9713338ef2 (diff) | |
download | qpid-python-145087188c7bd24175cbddbe1156433a4b498abc.tar.gz |
QPID-2807: Allow per message acknowledgement
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@987459 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/messaging/Session.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/AcceptTracker.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 15 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Session.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SessionImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 51 |
10 files changed, 113 insertions, 1 deletions
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index 688a4dd102..07a76993ee 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -76,6 +76,10 @@ class Session : public qpid::messaging::Handle<SessionImpl> */ QPID_MESSAGING_EXTERN void acknowledge(bool sync=false); /** + * Acknowledges the specified message. + */ + QPID_MESSAGING_EXTERN void acknowledge(Message&, bool sync=false); + /** * Rejects the specified message. This will prevent the message * being redelivered. This must be called before the message is * acknowledged. diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp index 80be5c56f3..bfb20118b5 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -30,6 +30,14 @@ void AcceptTracker::State::accept() unaccepted.clear(); } +void AcceptTracker::State::accept(qpid::framing::SequenceNumber id) +{ + if (unaccepted.contains(id)) { + unaccepted.remove(id); + unconfirmed.add(id); + } +} + void AcceptTracker::State::release() { unaccepted.clear(); @@ -63,6 +71,18 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session) aggregateState.accept(); } +void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.accept(id); + } + Record record; + record.accepted.add(id); + record.status = session.messageAccept(record.accepted); + pending.push_back(record); + aggregateState.accept(id); +} + void AcceptTracker::release(qpid::client::AsyncSession& session) { session.messageRelease(aggregateState.unaccepted); diff --git a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h index fb58a3a8c8..87890e41cc 100644 --- a/cpp/src/qpid/client/amqp0_10/AcceptTracker.h +++ b/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -42,6 +42,7 @@ class AcceptTracker public: void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); void accept(qpid::client::AsyncSession&); + void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&); void release(qpid::client::AsyncSession&); uint32_t acceptsPending(); uint32_t acceptsPending(const std::string& destination); @@ -55,12 +56,13 @@ class AcceptTracker */ qpid::framing::SequenceSet unaccepted; /** - * ids of messages for which an accpet has been issued but not + * ids of messages for which an accept has been issued but not * yet confirmed as completed */ qpid::framing::SequenceSet unconfirmed; void accept(); + void accept(qpid::framing::SequenceNumber); void release(); uint32_t acceptsPending(); void completed(qpid::framing::SequenceSet&); diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 2c00e6fae8..71e89bdba1 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -144,6 +144,13 @@ void IncomingMessages::accept() acceptTracker.accept(session); } +void IncomingMessages::accept(qpid::framing::SequenceNumber id) +{ + sys::Mutex::ScopedLock l(lock); + acceptTracker.accept(id, session); +} + + void IncomingMessages::releaseAll() { { diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 9640890d76..f6a291bc68 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -72,6 +72,7 @@ class IncomingMessages bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); + void accept(qpid::framing::SequenceNumber id); void releaseAll(); void releasePending(const std::string& destination); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 800c3269b9..1086146b0d 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -112,6 +112,15 @@ void SessionImpl::release(qpid::messaging::Message& m) execute1<Release>(m); } +void SessionImpl::acknowledge(qpid::messaging::Message& m) +{ + //Should probably throw an exception on failure here, or indicate + //it through a return type at least. Failure means that the + //message may be redelivered; i.e. the application cannot delete + //any state necessary for preventing reprocessing of the message + execute1<Acknowledge1>(m); +} + void SessionImpl::close() { if (hasError()) { @@ -456,6 +465,12 @@ void SessionImpl::acknowledgeImpl() if (!transactional) incoming.accept(); } +void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m) +{ + ScopedLock l(lock); + if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId()); +} + void SessionImpl::rejectImpl(qpid::messaging::Message& m) { SequenceSet set; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index a4a00e1481..3dd5cd0189 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -63,6 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void acknowledge(bool sync); void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); + void acknowledge(qpid::messaging::Message& msg); void close(); void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); @@ -137,6 +138,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void commitImpl(); void rollbackImpl(); void acknowledgeImpl(); + void acknowledgeImpl(qpid::messaging::Message&); void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); @@ -200,6 +202,14 @@ class SessionImpl : public qpid::messaging::SessionImpl Release(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { impl.releaseImpl(message); } }; + + struct Acknowledge1 : Command + { + qpid::messaging::Message& message; + + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} + void operator()() { impl.acknowledgeImpl(message); } + }; struct CreateSender; struct CreateReceiver; diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index ddbb9ee58e..f07fad434e 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -39,6 +39,7 @@ Session& Session::operator=(const Session& s) { return PI::assign(*this, s); } void Session::commit() { impl->commit(); } void Session::rollback() { impl->rollback(); } void Session::acknowledge(bool sync) { impl->acknowledge(sync); } +void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); if (s) sync(true); } void Session::reject(Message& m) { impl->reject(m); } void Session::release(Message& m) { impl->release(m); } void Session::close() { impl->close(); } diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index de050fc5bb..02a254e4f2 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -41,6 +41,7 @@ class SessionImpl : public virtual qpid::RefCounted virtual void commit() = 0; virtual void rollback() = 0; virtual void acknowledge(bool sync) = 0; + virtual void acknowledge(Message&) = 0; virtual void reject(Message&) = 0; virtual void release(Message&) = 0; virtual void close() = 0; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index d75ff9fdfd..66ce061b73 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -791,6 +791,57 @@ QPID_AUTO_TEST_CASE(testExceptionOnClosedConnection) BOOST_CHECK_THROW(connection.createSession(), MessagingException); } +QPID_AUTO_TEST_CASE(testAcknowledge) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + const uint count(20); + for (uint i = 0; i < count; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Session other = fix.connection.createSession(); + Receiver receiver = other.createReceiver(fix.queue); + std::vector<Message> messages; + for (uint i = 0; i < count; ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + messages.push_back(msg); + } + const uint batch(10); //acknowledge first 10 messages only + for (uint i = 0; i < batch; ++i) { + other.acknowledge(messages[i]); + } + messages.clear(); + other.sync(); + other.close(); + + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + for (uint i = 0; i < (count-batch); ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); + if (i % 2) other.acknowledge(msg); //acknowledge every other message + } + other.sync(); + other.close(); + + //check unacknowledged messages are still enqueued + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + for (uint i = 0; i < ((count-batch)/2); ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); + } + other.acknowledge();//acknowledge all messages + other.sync(); + other.close(); + + Message m; + //check queue is empty + BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |