diff options
author | Gordon Sim <gsim@apache.org> | 2009-02-12 11:43:51 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-02-12 11:43:51 +0000 |
commit | eaed6d20d8ba86a783fb8f021c4ee55953c23b6e (patch) | |
tree | eaee10c3d7f6fd3c2285a3f17e045a7c8ab83f3e | |
parent | e3e41035c067bf8fa881def361eae197ecf1e4aa (diff) | |
download | qpid-python-eaed6d20d8ba86a783fb8f021c4ee55953c23b6e.tar.gz |
QPID-1660: If selected consumer can't take a message, ensure others are notified of message availability.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743694 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 33 |
5 files changed, 75 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f3cdc03f7d..c9ee7f394f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -256,10 +256,30 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } +void Queue::notifyListener() +{ + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + if (messages.size()) { + listeners.populate(set); + } + } + set.notify(); +} + bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { if (c->preAcquires()) { - return consumeNextMessage(m, c); + switch (consumeNextMessage(m, c)) { + case CONSUMED: + return true; + case CANT_CONSUME: + notifyListener();//let someone else try + case NO_MESSAGES: + default: + return false; + } } else { return browseNextMessage(m, c); } @@ -291,14 +311,14 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) } } -bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); - return false; + return NO_MESSAGES; } else { QueuedMessage msg = getFront(); if (msg.payload->hasExpired()) { @@ -311,16 +331,16 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) if (c->accept(msg.payload)) { m = msg; popMsg(msg); - return true; + return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return false; + return CANT_CONSUME; } } else { //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - return false; + return CANT_CONSUME; } } } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index e4bcded8bd..61fbd45de8 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -68,6 +68,7 @@ namespace qpid { typedef std::deque<QueuedMessage> Messages; typedef std::map<string,boost::intrusive_ptr<Message> > LVQ; + enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; const string name; const bool autodelete; @@ -104,8 +105,9 @@ namespace qpid { void setPolicy(std::auto_ptr<QueuePolicy> policy); bool seek(QueuedMessage& msg, Consumer::shared_ptr position); bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + void notifyListener(); void removeListener(Consumer::shared_ptr); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 13a8c649d2..4f751e43b7 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -527,9 +527,19 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) } } +bool SemanticState::ConsumerImpl::haveCredit() +{ + if (msgCredit) { + return true; + } else { + blocked = true; + return false; + } +} + void SemanticState::ConsumerImpl::flush() { - while(queue->dispatch(shared_from_this())) + while(haveCredit() && queue->dispatch(shared_from_this())) ; stop(); } @@ -587,7 +597,7 @@ bool SemanticState::ConsumerImpl::hasOutput() { bool SemanticState::ConsumerImpl::doOutput() { - return queue->dispatch(shared_from_this()); + return haveCredit() && queue->dispatch(shared_from_this()); } void SemanticState::ConsumerImpl::enableNotify() diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index a1bee23fd2..c31a6978c9 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -82,6 +82,7 @@ class SemanticState : public sys::OutputTask, bool checkCredit(boost::intrusive_ptr<Message>& msg); void allocateCredit(boost::intrusive_ptr<Message>& msg); + bool haveCredit(); public: typedef boost::shared_ptr<ConsumerImpl> shared_ptr; diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index f0c7c1d0c6..6ec73fd47c 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -488,6 +488,39 @@ QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) { BOOST_CHECK(!q.get(got)); } +QPID_AUTO_TEST_CASE(testReliableDispatch) { + ClientSessionFixture fix; + std::string queue("a-queue"); + fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true); + + ConnectionSettings settings; + settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); + + Connection c1; + c1.open(settings); + Session s1 = c1.newSession(); + SubscriptionManager subs1(s1); + LocalQueue q1; + subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit + + Connection c2; + c2.open(settings); + Session s2 = c2.newSession(); + SubscriptionManager subs2(s2); + LocalQueue q2; + subs2.subscribe(q2, queue);//second subscriber has credit + + fix.session.messageTransfer(arg::content=Message("my-message", queue)); + + //check that the second consumer gets the message + Message got; + BOOST_CHECK(q2.get(got, 1*TIME_SEC)); + BOOST_CHECK_EQUAL("my-message", got.getData()); + + c1.close(); + c2.close(); +} + QPID_AUTO_TEST_SUITE_END() |