summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-05 21:12:54 +0000
committerGordon Sim <gsim@apache.org>2008-11-05 21:12:54 +0000
commit359f60c318627900c3ac216496486c42d1a4df8a (patch)
tree879b78ce5d2cc1344f4840dbdbf83da66605ba47 /cpp/src
parent06c6c1db04d562b6cad0293cb4c5f8e40dde7a19 (diff)
downloadqpid-python-359f60c318627900c3ac216496486c42d1a4df8a.tar.gz
Added ability to release messages through the Subscription class (+test)
Added another mode for managing completion (+test) Fixed regression where bytes credit was not reallocated in windowing mode after an accept/release Fixed regression where subscribe request is issued before listener is registered with dispatcher git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711698 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp5
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h9
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp8
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.cpp5
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.h1
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp10
-rw-r--r--cpp/src/qpid/client/SessionImpl.h1
-rw-r--r--cpp/src/qpid/client/Subscription.cpp1
-rw-r--r--cpp/src/qpid/client/Subscription.h8
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.cpp32
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.h8
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp3
-rw-r--r--cpp/src/qpid/client/SubscriptionSettings.h34
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp84
14 files changed, 190 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 1a15373e6a..900016d381 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -43,7 +43,8 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
cancelled(false),
completed(false),
ended(accepted),
- windowing(_windowing)
+ windowing(_windowing),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : 0)
{}
void DeliveryRecord::setEnded()
@@ -153,7 +154,7 @@ void DeliveryRecord::reject()
uint32_t DeliveryRecord::getCredit() const
{
- return msg.payload ? msg.payload->getRequiredCredit() : 0;
+ return credit;
}
void DeliveryRecord::acquire(DeliveryIds& results) {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index f6ffb64697..952e888c03 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -64,6 +64,15 @@ class DeliveryRecord
bool ended;
const bool windowing;
+ /**
+ * Record required credit on construction as the pointer to the
+ * message may be reset once we no longer need to deliver it
+ * (e.g. when it is accepted), but we will still need to be able
+ * to reallocate credit when it is completed (which could happen
+ * after that).
+ */
+ const uint32_t credit;
+
public:
DeliveryRecord(
const QueuedMessage& msg,
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index aefb87a392..c967823ecc 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -48,12 +48,16 @@ bool QueuePolicy::checkLimit(const QueuedMessage& m)
if (exceeded) {
if (!policyExceeded) {
policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ if (m.queue) {
+ QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ }
}
} else {
if (policyExceeded) {
policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ if (m.queue) {
+ QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ }
}
}
return !exceeded;
diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp
index 701acaf7d4..c933a64f07 100644
--- a/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -49,6 +49,11 @@ void SessionBase_0_10::sync()
impl->send(b).wait(*impl);
}
+void SessionBase_0_10::markCompleted(const framing::SequenceSet& ids, bool notifyPeer)
+{
+ impl->markCompleted(ids, notifyPeer);
+}
+
void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer)
{
impl->markCompleted(id, cumulative, notifyPeer);
diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h
index 2d1586d042..091c977053 100644
--- a/cpp/src/qpid/client/SessionBase_0_10.h
+++ b/cpp/src/qpid/client/SessionBase_0_10.h
@@ -104,6 +104,7 @@ class SessionBase_0_10 {
Execution& getExecution();
void flush();
+ void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
void sendCompletion();
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 0f86f7ff0a..cf71d4f4a5 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -217,6 +217,16 @@ struct MarkCompleted
};
+void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer)
+{
+ Lock l(state);
+ incompleteIn.remove(ids);
+ completedIn.add(ids);
+ if (notifyPeer) {
+ sendCompletion();
+ }
+}
+
void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
{
Lock l(state);
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 1414862792..ea7776634a 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -89,6 +89,7 @@ public:
Demux& getDemux();
void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
+ void markCompleted(const framing::SequenceSet& ids, bool notifyPeer);
bool isComplete(const framing::SequenceNumber& id);
bool isCompleteUpTo(const framing::SequenceNumber& id);
void waitForCompletion(const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/client/Subscription.cpp b/cpp/src/qpid/client/Subscription.cpp
index af4d7bdb56..bf788c5f93 100644
--- a/cpp/src/qpid/client/Subscription.cpp
+++ b/cpp/src/qpid/client/Subscription.cpp
@@ -38,6 +38,7 @@ SequenceSet Subscription::getUnacquired() const { return impl->getUnacquired();
SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); }
void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); }
void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
+void Subscription::release(const SequenceSet& messageIds) { impl->release(messageIds); }
Session Subscription::getSession() const { return impl->getSession(); }
SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
void Subscription::cancel() { impl->cancel(); }
diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h
index 2ed56d4e8a..b25a64a4a2 100644
--- a/cpp/src/qpid/client/Subscription.h
+++ b/cpp/src/qpid/client/Subscription.h
@@ -79,12 +79,20 @@ class Subscription : public Handle<SubscriptionImpl> {
*/
void accept(const SequenceSet& messageIds);
+ /** Release messageIds and remove them from the unaccepted set.
+ *@pre messageIds is a subset of getUnaccepted()
+ */
+ void release(const SequenceSet& messageIds);
+
/* Acquire a single message */
void acquire(const Message& m) { acquire(SequenceSet(m.getId())); }
/* Accept a single message */
void accept(const Message& m) { accept(SequenceSet(m.getId())); }
+ /* Release a single message */
+ void release(const Message& m) { release(SequenceSet(m.getId())); }
+
/** Get the session associated with this subscription */
Session getSession() const;
diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp
index 684cca031a..0ccf5674fd 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -31,6 +31,9 @@ using framing::MessageAcquireResult;
SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
: manager(m), name(n), queue(q), settings(s), listener(l)
+{}
+
+void SubscriptionImpl::subscribe()
{
async(manager.getSession()).messageSubscribe(
arg::queue=queue,
@@ -79,11 +82,25 @@ void SubscriptionImpl::accept(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
manager.getSession().messageAccept(messageIds);
unaccepted.remove(messageIds);
- if (settings.autoComplete) {
+ switch (settings.completionMode) {
+ case COMPLETE_ON_ACCEPT:
+ manager.getSession().markCompleted(messageIds, true);
+ break;
+ case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
+ break;
+ default://do nothing
+ break;
}
}
+void SubscriptionImpl::release(const SequenceSet& messageIds) {
+ Mutex::ScopedLock l(lock);
+ manager.getSession().messageRelease(messageIds);
+ if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
+ unaccepted.remove(messageIds);
+}
+
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
@@ -102,16 +119,23 @@ void SubscriptionImpl::received(Message& m) {
listener->received(m);
}
- if (settings.autoComplete) {
+ if (settings.completionMode == COMPLETE_ON_DELIVERY) {
manager.getSession().markCompleted(m.getId(), false, false);
}
if (settings.autoAck) {
if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
- unaccepted.clear();
- if (settings.autoComplete) {
+ switch (settings.completionMode) {
+ case COMPLETE_ON_ACCEPT:
+ manager.getSession().markCompleted(unaccepted, true);
+ break;
+ case COMPLETE_ON_DELIVERY:
manager.getSession().sendCompletion();
+ break;
+ default://do nothing
+ break;
}
+ unaccepted.clear();
}
}
}
diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h
index 44fd1a7d6c..0c51b598c8 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.h
+++ b/cpp/src/qpid/client/SubscriptionImpl.h
@@ -70,15 +70,21 @@ class SubscriptionImpl : public RefCounted, public MessageListener {
/** Acquire messageIds and remove them from the un-acquired set for the session. */
void acquire(const SequenceSet& messageIds);
- /** Accept messageIds and remove them from the un-acceptd set for the session. */
+ /** Accept messageIds and remove them from the un-accepted set for the session. */
void accept(const SequenceSet& messageIds);
+ /** Release messageIds and remove them from the un-accepted set for the session. */
+ void release(const SequenceSet& messageIds);
+
/** Get the session associated with this subscription */
Session getSession() const;
/** Get the subscription manager associated with this subscription */
SubscriptionManager& getSubscriptionManager() const;
+ /** Send subscription request and issue appropriate flow control commands. */
+ void subscribe();
+
/** Cancel the subscription. */
void cancel();
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 6fa1973b9d..7445202ec3 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -44,6 +44,8 @@ Subscription SubscriptionManager::subscribe(
std::string name=n.empty() ? q:n;
boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
dispatcher.listen(si);
+ //issue subscription request after listener is registered with dispatcher
+ si->subscribe();
return subscriptions[name] = Subscription(si.get());
}
@@ -53,6 +55,7 @@ Subscription SubscriptionManager::subscribe(
std::string name=n.empty() ? q:n;
lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
+ si->subscribe();
lq.subscription = Subscription(si.get());
return subscriptions[name] = lq.subscription;
}
diff --git a/cpp/src/qpid/client/SubscriptionSettings.h b/cpp/src/qpid/client/SubscriptionSettings.h
index 19fbc3486b..4b294b28aa 100644
--- a/cpp/src/qpid/client/SubscriptionSettings.h
+++ b/cpp/src/qpid/client/SubscriptionSettings.h
@@ -30,6 +30,11 @@ namespace client {
/** Bring AMQP enum definitions for message class into this namespace. */
using namespace qpid::framing::message;
+enum CompletionMode {
+ MANUAL_COMPLETION = 0,
+ COMPLETE_ON_DELIVERY = 1,
+ COMPLETE_ON_ACCEPT = 2
+};
/**
* Settings for a subscription.
*/
@@ -40,8 +45,8 @@ struct SubscriptionSettings
AcceptMode accept=ACCEPT_MODE_EXPLICIT,
AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
unsigned int autoAck_=1,
- bool autoComplete_=true
- ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), autoComplete(autoComplete_) {}
+ CompletionMode completion=COMPLETE_ON_DELIVERY
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), completionMode(completion) {}
FlowControl flowControl; ///@< Flow control settings. @see FlowControl
AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
@@ -53,19 +58,28 @@ struct SubscriptionSettings
* ACCEPT_MODE_NODE.*/
unsigned int autoAck;
/**
- * If set to true, messages will be marked as completed (in
- * windowing mode, completion of a message will cause the credit
- * used up by that message to be reallocated) once they have been
- * received. The server will be explicitly notified of all
- * completed messages when the next accept is sent through the
+ * In windowing mode, completion of a message will cause the
+ * credit used up by that message to be reallocated. The
+ * subscriptions completion mode controls how completion is
+ * managed.
+ *
+ * If set to COMPLETE_ON_DELIVERY (which is the default), messages
+ * will be marked as completed once they have been received. The
+ * server will be explicitly notified of all completed messages
+ * for the session when the next accept is sent through the
* subscription (either explictly or through autAck). However the
* server may also periodically request information on the
* completed messages.
*
- * If set to false the application is responsible for completing
- * messages (@see Session::markCompleted()).
+ * If set to COMPLETE_ON_ACCEPT, messages will be marked as
+ * completed once they are accepted (via the Subscription class)
+ * and the server will also be notified of all completed messages
+ * for the session.
+ *
+ * If set to MANUAL_COMPLETION the application is responsible for
+ * completing messages (@see Session::markCompleted()).
*/
- bool autoComplete;
+ CompletionMode completionMode;
};
}} // namespace qpid::client
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index abe317aad8..cca16bd9f8 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -296,6 +296,90 @@ QPID_AUTO_TEST_CASE(testExpirationOnPop) {
}
}
+QPID_AUTO_TEST_CASE(testRelease) {
+ ClientSessionFixture fix;
+
+ const uint count=10;
+ for (uint i = 0; i < count; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ fix.subs.setAutoStop(false);
+ sys::Thread runner(fix.subs);//start dispatcher thread
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+
+ SimpleListener l1;
+ Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings);
+ l1.waitFor(count);
+ s1.cancel();
+
+ for (uint i = 0; i < count; i++) {
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData());
+ }
+ s1.release(s1.getUnaccepted());
+
+ //check that released messages are redelivered
+ settings.autoAck = 1;
+ SimpleListener l2;
+ Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings);
+ l2.waitFor(count);
+ for (uint i = 0; i < count; i++) {
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData());
+ }
+
+ fix.subs.stop();
+ runner.join();
+ fix.session.close();
+}
+
+QPID_AUTO_TEST_CASE(testCompleteOnAccept) {
+ ClientSessionFixture fix;
+
+ fix.session.queueDeclare(arg::queue="HELP_FIND_ME");
+
+ const uint count = 8;
+ const uint chunk = 4;
+ for (uint i = 0; i < count; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ SubscriptionSettings settings;
+ settings.autoAck = 0;
+ settings.completionMode = COMPLETE_ON_ACCEPT;
+ settings.flowControl = FlowControl::messageWindow(chunk);
+
+ LocalQueue q;
+ Subscription s = fix.subs.subscribe(q, "my-queue", settings);
+ fix.session.messageFlush(arg::destination=s.getName());
+ SequenceSet accepted;
+ for (uint i = 0; i < chunk; i++) {
+ Message m;
+ BOOST_CHECK(q.get(m));
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+ accepted.add(m.getId());
+ }
+ Message m;
+ BOOST_CHECK(!q.get(m));
+
+ s.accept(accepted);
+ fix.session.messageFlush(arg::destination=s.getName());
+ accepted.clear();
+
+ for (uint i = chunk; i < count; i++) {
+ Message m;
+ BOOST_CHECK(q.get(m));
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+ accepted.add(m.getId());
+ }
+ fix.session.messageAccept(accepted);
+
+ fix.session.queueDelete(arg::queue="HELP_FIND_ME");
+
+}
+
QPID_AUTO_TEST_SUITE_END()