diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueuePolicy.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionBase_0_10.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/Subscription.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/Subscription.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionSettings.h | 34 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 84 |
14 files changed, 190 insertions, 19 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 1a15373e6a..900016d381 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -43,7 +43,8 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, cancelled(false), completed(false), ended(accepted), - windowing(_windowing) + windowing(_windowing), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0) {} void DeliveryRecord::setEnded() @@ -153,7 +154,7 @@ void DeliveryRecord::reject() uint32_t DeliveryRecord::getCredit() const { - return msg.payload ? msg.payload->getRequiredCredit() : 0; + return credit; } void DeliveryRecord::acquire(DeliveryIds& results) { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index f6ffb64697..952e888c03 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -64,6 +64,15 @@ class DeliveryRecord bool ended; const bool windowing; + /** + * Record required credit on construction as the pointer to the + * message may be reset once we no longer need to deliver it + * (e.g. when it is accepted), but we will still need to be able + * to reallocate credit when it is completed (which could happen + * after that). + */ + const uint32_t credit; + public: DeliveryRecord( const QueuedMessage& msg, diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index aefb87a392..c967823ecc 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -48,12 +48,16 @@ bool QueuePolicy::checkLimit(const QueuedMessage& m) if (exceeded) { if (!policyExceeded) { policyExceeded = true; - QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName()); + if (m.queue) { + QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName()); + } } } else { if (policyExceeded) { policyExceeded = false; - QPID_LOG(info, "Queue size within policy for " << m.queue->getName()); + if (m.queue) { + QPID_LOG(info, "Queue size within policy for " << m.queue->getName()); + } } } return !exceeded; diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp index 701acaf7d4..c933a64f07 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -49,6 +49,11 @@ void SessionBase_0_10::sync() impl->send(b).wait(*impl); } +void SessionBase_0_10::markCompleted(const framing::SequenceSet& ids, bool notifyPeer) +{ + impl->markCompleted(ids, notifyPeer); +} + void SessionBase_0_10::markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer) { impl->markCompleted(id, cumulative, notifyPeer); diff --git a/cpp/src/qpid/client/SessionBase_0_10.h b/cpp/src/qpid/client/SessionBase_0_10.h index 2d1586d042..091c977053 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.h +++ b/cpp/src/qpid/client/SessionBase_0_10.h @@ -104,6 +104,7 @@ class SessionBase_0_10 { Execution& getExecution(); void flush(); + void markCompleted(const framing::SequenceSet& ids, bool notifyPeer); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); void sendCompletion(); diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 0f86f7ff0a..cf71d4f4a5 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -217,6 +217,16 @@ struct MarkCompleted }; +void SessionImpl::markCompleted(const SequenceSet& ids, bool notifyPeer) +{ + Lock l(state); + incompleteIn.remove(ids); + completedIn.add(ids); + if (notifyPeer) { + sendCompletion(); + } +} + void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer) { Lock l(state); diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 1414862792..ea7776634a 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -89,6 +89,7 @@ public: Demux& getDemux(); void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer); + void markCompleted(const framing::SequenceSet& ids, bool notifyPeer); bool isComplete(const framing::SequenceNumber& id); bool isCompleteUpTo(const framing::SequenceNumber& id); void waitForCompletion(const framing::SequenceNumber& id); diff --git a/cpp/src/qpid/client/Subscription.cpp b/cpp/src/qpid/client/Subscription.cpp index af4d7bdb56..bf788c5f93 100644 --- a/cpp/src/qpid/client/Subscription.cpp +++ b/cpp/src/qpid/client/Subscription.cpp @@ -38,6 +38,7 @@ SequenceSet Subscription::getUnacquired() const { return impl->getUnacquired(); SequenceSet Subscription::getUnaccepted() const { return impl->getUnaccepted(); } void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messageIds); } void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); } +void Subscription::release(const SequenceSet& messageIds) { impl->release(messageIds); } Session Subscription::getSession() const { return impl->getSession(); } SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); } void Subscription::cancel() { impl->cancel(); } diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h index 2ed56d4e8a..b25a64a4a2 100644 --- a/cpp/src/qpid/client/Subscription.h +++ b/cpp/src/qpid/client/Subscription.h @@ -79,12 +79,20 @@ class Subscription : public Handle<SubscriptionImpl> { */ void accept(const SequenceSet& messageIds); + /** Release messageIds and remove them from the unaccepted set. + *@pre messageIds is a subset of getUnaccepted() + */ + void release(const SequenceSet& messageIds); + /* Acquire a single message */ void acquire(const Message& m) { acquire(SequenceSet(m.getId())); } /* Accept a single message */ void accept(const Message& m) { accept(SequenceSet(m.getId())); } + /* Release a single message */ + void release(const Message& m) { release(SequenceSet(m.getId())); } + /** Get the session associated with this subscription */ Session getSession() const; diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp index 684cca031a..0ccf5674fd 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -31,6 +31,9 @@ using framing::MessageAcquireResult; SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l) : manager(m), name(n), queue(q), settings(s), listener(l) +{} + +void SubscriptionImpl::subscribe() { async(manager.getSession()).messageSubscribe( arg::queue=queue, @@ -79,11 +82,25 @@ void SubscriptionImpl::accept(const SequenceSet& messageIds) { Mutex::ScopedLock l(lock); manager.getSession().messageAccept(messageIds); unaccepted.remove(messageIds); - if (settings.autoComplete) { + switch (settings.completionMode) { + case COMPLETE_ON_ACCEPT: + manager.getSession().markCompleted(messageIds, true); + break; + case COMPLETE_ON_DELIVERY: manager.getSession().sendCompletion(); + break; + default://do nothing + break; } } +void SubscriptionImpl::release(const SequenceSet& messageIds) { + Mutex::ScopedLock l(lock); + manager.getSession().messageRelease(messageIds); + if (settings.acceptMode == ACCEPT_MODE_EXPLICIT) + unaccepted.remove(messageIds); +} + Session SubscriptionImpl::getSession() const { return manager.getSession(); } SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; } @@ -102,16 +119,23 @@ void SubscriptionImpl::received(Message& m) { listener->received(m); } - if (settings.autoComplete) { + if (settings.completionMode == COMPLETE_ON_DELIVERY) { manager.getSession().markCompleted(m.getId(), false, false); } if (settings.autoAck) { if (unaccepted.size() >= settings.autoAck) { async(manager.getSession()).messageAccept(unaccepted); - unaccepted.clear(); - if (settings.autoComplete) { + switch (settings.completionMode) { + case COMPLETE_ON_ACCEPT: + manager.getSession().markCompleted(unaccepted, true); + break; + case COMPLETE_ON_DELIVERY: manager.getSession().sendCompletion(); + break; + default://do nothing + break; } + unaccepted.clear(); } } } diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h index 44fd1a7d6c..0c51b598c8 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.h +++ b/cpp/src/qpid/client/SubscriptionImpl.h @@ -70,15 +70,21 @@ class SubscriptionImpl : public RefCounted, public MessageListener { /** Acquire messageIds and remove them from the un-acquired set for the session. */ void acquire(const SequenceSet& messageIds); - /** Accept messageIds and remove them from the un-acceptd set for the session. */ + /** Accept messageIds and remove them from the un-accepted set for the session. */ void accept(const SequenceSet& messageIds); + /** Release messageIds and remove them from the un-accepted set for the session. */ + void release(const SequenceSet& messageIds); + /** Get the session associated with this subscription */ Session getSession() const; /** Get the subscription manager associated with this subscription */ SubscriptionManager& getSubscriptionManager() const; + /** Send subscription request and issue appropriate flow control commands. */ + void subscribe(); + /** Cancel the subscription. */ void cancel(); diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 6fa1973b9d..7445202ec3 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -44,6 +44,8 @@ Subscription SubscriptionManager::subscribe( std::string name=n.empty() ? q:n; boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener); dispatcher.listen(si); + //issue subscription request after listener is registered with dispatcher + si->subscribe(); return subscriptions[name] = Subscription(si.get()); } @@ -53,6 +55,7 @@ Subscription SubscriptionManager::subscribe( std::string name=n.empty() ? q:n; lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name)); boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); + si->subscribe(); lq.subscription = Subscription(si.get()); return subscriptions[name] = lq.subscription; } diff --git a/cpp/src/qpid/client/SubscriptionSettings.h b/cpp/src/qpid/client/SubscriptionSettings.h index 19fbc3486b..4b294b28aa 100644 --- a/cpp/src/qpid/client/SubscriptionSettings.h +++ b/cpp/src/qpid/client/SubscriptionSettings.h @@ -30,6 +30,11 @@ namespace client { /** Bring AMQP enum definitions for message class into this namespace. */ using namespace qpid::framing::message; +enum CompletionMode { + MANUAL_COMPLETION = 0, + COMPLETE_ON_DELIVERY = 1, + COMPLETE_ON_ACCEPT = 2 +}; /** * Settings for a subscription. */ @@ -40,8 +45,8 @@ struct SubscriptionSettings AcceptMode accept=ACCEPT_MODE_EXPLICIT, AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED, unsigned int autoAck_=1, - bool autoComplete_=true - ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), autoComplete(autoComplete_) {} + CompletionMode completion=COMPLETE_ON_DELIVERY + ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), completionMode(completion) {} FlowControl flowControl; ///@< Flow control settings. @see FlowControl AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE @@ -53,19 +58,28 @@ struct SubscriptionSettings * ACCEPT_MODE_NODE.*/ unsigned int autoAck; /** - * If set to true, messages will be marked as completed (in - * windowing mode, completion of a message will cause the credit - * used up by that message to be reallocated) once they have been - * received. The server will be explicitly notified of all - * completed messages when the next accept is sent through the + * In windowing mode, completion of a message will cause the + * credit used up by that message to be reallocated. The + * subscriptions completion mode controls how completion is + * managed. + * + * If set to COMPLETE_ON_DELIVERY (which is the default), messages + * will be marked as completed once they have been received. The + * server will be explicitly notified of all completed messages + * for the session when the next accept is sent through the * subscription (either explictly or through autAck). However the * server may also periodically request information on the * completed messages. * - * If set to false the application is responsible for completing - * messages (@see Session::markCompleted()). + * If set to COMPLETE_ON_ACCEPT, messages will be marked as + * completed once they are accepted (via the Subscription class) + * and the server will also be notified of all completed messages + * for the session. + * + * If set to MANUAL_COMPLETION the application is responsible for + * completing messages (@see Session::markCompleted()). */ - bool autoComplete; + CompletionMode completionMode; }; }} // namespace qpid::client diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index abe317aad8..cca16bd9f8 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -296,6 +296,90 @@ QPID_AUTO_TEST_CASE(testExpirationOnPop) { } } +QPID_AUTO_TEST_CASE(testRelease) { + ClientSessionFixture fix; + + const uint count=10; + for (uint i = 0; i < count; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + fix.session.messageTransfer(arg::content=m); + } + + fix.subs.setAutoStop(false); + sys::Thread runner(fix.subs);//start dispatcher thread + SubscriptionSettings settings; + settings.autoAck = 0; + + SimpleListener l1; + Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings); + l1.waitFor(count); + s1.cancel(); + + for (uint i = 0; i < count; i++) { + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData()); + } + s1.release(s1.getUnaccepted()); + + //check that released messages are redelivered + settings.autoAck = 1; + SimpleListener l2; + Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings); + l2.waitFor(count); + for (uint i = 0; i < count; i++) { + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData()); + } + + fix.subs.stop(); + runner.join(); + fix.session.close(); +} + +QPID_AUTO_TEST_CASE(testCompleteOnAccept) { + ClientSessionFixture fix; + + fix.session.queueDeclare(arg::queue="HELP_FIND_ME"); + + const uint count = 8; + const uint chunk = 4; + for (uint i = 0; i < count; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + fix.session.messageTransfer(arg::content=m); + } + + SubscriptionSettings settings; + settings.autoAck = 0; + settings.completionMode = COMPLETE_ON_ACCEPT; + settings.flowControl = FlowControl::messageWindow(chunk); + + LocalQueue q; + Subscription s = fix.subs.subscribe(q, "my-queue", settings); + fix.session.messageFlush(arg::destination=s.getName()); + SequenceSet accepted; + for (uint i = 0; i < chunk; i++) { + Message m; + BOOST_CHECK(q.get(m)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + accepted.add(m.getId()); + } + Message m; + BOOST_CHECK(!q.get(m)); + + s.accept(accepted); + fix.session.messageFlush(arg::destination=s.getName()); + accepted.clear(); + + for (uint i = chunk; i < count; i++) { + Message m; + BOOST_CHECK(q.get(m)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + accepted.add(m.getId()); + } + fix.session.messageAccept(accepted); + + fix.session.queueDelete(arg::queue="HELP_FIND_ME"); + +} + QPID_AUTO_TEST_SUITE_END() |