summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-06-15 11:48:43 +0000
committerGordon Sim <gsim@apache.org>2011-06-15 11:48:43 +0000
commit02a6c2f7ab64332ea13dff4605a8478b4ec9bd37 (patch)
tree8daf10524a6b7f20dd342df4ced6fc458d608817
parentffffe1b5cd71d2f12a26124a903811a1e3da3c12 (diff)
downloadqpid-python-02a6c2f7ab64332ea13dff4605a8478b4ec9bd37.tar.gz
QPID-3200: Add new method to session for cumulative acknowledgement upto (and including) a specified message
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1136003 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/messaging/Session.h4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp26
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp9
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h11
-rw-r--r--qpid/cpp/src/qpid/messaging/Session.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/SessionImpl.h2
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp40
10 files changed, 81 insertions, 24 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Session.h b/qpid/cpp/include/qpid/messaging/Session.h
index 52786eb5f4..428f8aa491 100644
--- a/qpid/cpp/include/qpid/messaging/Session.h
+++ b/qpid/cpp/include/qpid/messaging/Session.h
@@ -78,6 +78,10 @@ class QPID_MESSAGING_CLASS_EXTERN Session : public qpid::messaging::Handle<Sessi
*/
QPID_MESSAGING_EXTERN void acknowledge(Message&, bool sync=false);
/**
+ * Acknowledges all message up to the specified message.
+ */
+ QPID_MESSAGING_EXTERN void acknowledgeUpTo(Message&, bool sync=false);
+ /**
* Rejects the specified message. The broker does not redeliver a
* message that has been rejected. Once a message has been
* acknowledged, it can no longer be rejected.
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
index bfb20118b5..e8d250de0f 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -30,12 +30,23 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
+SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
+ SequenceSet accepting;
+ if (cumulative) {
+ for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
+ accepting.add(*i);
+ }
+ unconfirmed.add(accepting);
+ unaccepted.remove(accepting);
+ } else {
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
+ accepting.add(id);
+ }
}
+ return accepting;
}
void AcceptTracker::State::release()
@@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client::AsyncSession& session)
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id);
+ i->second.accept(id, cumulative);
}
Record record;
- record.accepted.add(id);
+ record.accepted = aggregateState.accept(id, cumulative);
record.status = session.messageAccept(record.accepted);
pending.push_back(record);
- aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
index 87890e41cc..9e801e8147 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -42,7 +42,7 @@ class AcceptTracker
public:
void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
void accept(qpid::client::AsyncSession&);
- void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
void release(qpid::client::AsyncSession&);
uint32_t acceptsPending();
uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
qpid::framing::SequenceSet unconfirmed;
void accept();
- void accept(qpid::framing::SequenceNumber);
+ qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
void release();
uint32_t acceptsPending();
void completed(qpid::framing::SequenceSet&);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 71e89bdba1..5cf20c92eb 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
acceptTracker.accept(session);
}
-void IncomingMessages::accept(qpid::framing::SequenceNumber id)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
sys::Mutex::ScopedLock l(lock);
- acceptTracker.accept(id, session);
+ acceptTracker.accept(id, session, cumulative);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index f6a291bc68..9053b70312 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -72,7 +72,7 @@ class IncomingMessages
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
- void accept(qpid::framing::SequenceNumber id);
+ void accept(qpid::framing::SequenceNumber id, bool cumulative);
void releaseAll();
void releasePending(const std::string& destination);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 9e43e70f32..787be7de2a 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -112,13 +112,14 @@ void SessionImpl::release(qpid::messaging::Message& m)
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m)
+void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- execute1<Acknowledge1>(m);
+ Acknowledge2 ack(*this, m, cumulative);
+ execute(ack);
}
void SessionImpl::close()
@@ -467,10 +468,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 2a2aa47df6..c7dea77d18 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void acknowledge(bool sync);
void reject(qpid::messaging::Message&);
void release(qpid::messaging::Message&);
- void acknowledge(qpid::messaging::Message& msg);
+ void acknowledge(qpid::messaging::Message& msg, bool cumulative);
void close();
void sync(bool block);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
void commitImpl();
void rollbackImpl();
void acknowledgeImpl();
- void acknowledgeImpl(qpid::messaging::Message&);
+ void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
void rejectImpl(qpid::messaging::Message&);
void releaseImpl(qpid::messaging::Message&);
void closeImpl();
@@ -204,12 +204,13 @@ class SessionImpl : public qpid::messaging::SessionImpl
void operator()() { impl.releaseImpl(message); }
};
- struct Acknowledge1 : Command
+ struct Acknowledge2 : Command
{
qpid::messaging::Message& message;
+ bool cumulative;
- Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
- void operator()() { impl.acknowledgeImpl(message); }
+ Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
+ void operator()() { impl.acknowledgeImpl(message, cumulative); }
};
struct CreateSender;
diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp
index 496953a8e5..cccfd9a873 100644
--- a/qpid/cpp/src/qpid/messaging/Session.cpp
+++ b/qpid/cpp/src/qpid/messaging/Session.cpp
@@ -39,7 +39,8 @@ Session& Session::operator=(const Session& s) { return PI::assign(*this, s); }
void Session::commit() { impl->commit(); }
void Session::rollback() { impl->rollback(); }
void Session::acknowledge(bool sync) { impl->acknowledge(sync); }
-void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m); sync(s); }
+void Session::acknowledge(Message& m, bool s) { impl->acknowledge(m, false); sync(s); }
+void Session::acknowledgeUpTo(Message& m, bool s) { impl->acknowledge(m, true); sync(s); }
void Session::reject(Message& m) { impl->reject(m); }
void Session::release(Message& m) { impl->release(m); }
void Session::close() { impl->close(); }
diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h
index 02a254e4f2..60ae615253 100644
--- a/qpid/cpp/src/qpid/messaging/SessionImpl.h
+++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h
@@ -41,7 +41,7 @@ class SessionImpl : public virtual qpid::RefCounted
virtual void commit() = 0;
virtual void rollback() = 0;
virtual void acknowledge(bool sync) = 0;
- virtual void acknowledge(Message&) = 0;
+ virtual void acknowledge(Message&, bool cumulative) = 0;
virtual void reject(Message&) = 0;
virtual void release(Message&) = 0;
virtual void close() = 0;
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index a93d8f78ae..fae45a94d0 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1024,6 +1024,46 @@ QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testAcknowledgeUpTo)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ const uint count(20);
+ for (uint i = 0; i < count; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+
+ Session other = fix.connection.createSession();
+ Receiver receiver = other.createReceiver(fix.queue);
+ std::vector<Message> messages;
+ for (uint i = 0; i < count; ++i) {
+ Message msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ messages.push_back(msg);
+ }
+ const uint batch = 10;
+ other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only
+
+ messages.clear();
+ other.sync();
+ other.close();
+
+ other = fix.connection.createSession();
+ receiver = other.createReceiver(fix.queue);
+ Message msg;
+ for (uint i = 0; i < (count-batch); ++i) {
+ msg = receiver.fetch();
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
+ }
+ other.acknowledgeUpTo(msg);
+ other.sync();
+ other.close();
+
+ Message m;
+ //check queue is empty
+ BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests