summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-02-12 11:43:51 +0000
committerGordon Sim <gsim@apache.org>2009-02-12 11:43:51 +0000
commiteaed6d20d8ba86a783fb8f021c4ee55953c23b6e (patch)
treeeaee10c3d7f6fd3c2285a3f17e045a7c8ab83f3e
parente3e41035c067bf8fa881def361eae197ecf1e4aa (diff)
downloadqpid-python-eaed6d20d8ba86a783fb8f021c4ee55953c23b6e.tar.gz
QPID-1660: If selected consumer can't take a message, ensure others are notified of message availability.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@743694 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h1
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp33
5 files changed, 75 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f3cdc03f7d..c9ee7f394f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -256,10 +256,30 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
+void Queue::notifyListener()
+{
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
+}
+
bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
if (c->preAcquires()) {
- return consumeNextMessage(m, c);
+ switch (consumeNextMessage(m, c)) {
+ case CONSUMED:
+ return true;
+ case CANT_CONSUME:
+ notifyListener();//let someone else try
+ case NO_MESSAGES:
+ default:
+ return false;
+ }
} else {
return browseNextMessage(m, c);
}
@@ -291,14 +311,14 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
}
}
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
- return false;
+ return NO_MESSAGES;
} else {
QueuedMessage msg = getFront();
if (msg.payload->hasExpired()) {
@@ -311,16 +331,16 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
if (c->accept(msg.payload)) {
m = msg;
popMsg(msg);
- return true;
+ return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- return false;
+ return CANT_CONSUME;
}
} else {
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- return false;
+ return CANT_CONSUME;
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index e4bcded8bd..61fbd45de8 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -68,6 +68,7 @@ namespace qpid {
typedef std::deque<QueuedMessage> Messages;
typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
+ enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
const string name;
const bool autodelete;
@@ -104,8 +105,9 @@ namespace qpid {
void setPolicy(std::auto_ptr<QueuePolicy> policy);
bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- bool consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ void notifyListener();
void removeListener(Consumer::shared_ptr);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 13a8c649d2..4f751e43b7 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -527,9 +527,19 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
}
}
+bool SemanticState::ConsumerImpl::haveCredit()
+{
+ if (msgCredit) {
+ return true;
+ } else {
+ blocked = true;
+ return false;
+ }
+}
+
void SemanticState::ConsumerImpl::flush()
{
- while(queue->dispatch(shared_from_this()))
+ while(haveCredit() && queue->dispatch(shared_from_this()))
;
stop();
}
@@ -587,7 +597,7 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- return queue->dispatch(shared_from_this());
+ return haveCredit() && queue->dispatch(shared_from_this());
}
void SemanticState::ConsumerImpl::enableNotify()
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index a1bee23fd2..c31a6978c9 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -82,6 +82,7 @@ class SemanticState : public sys::OutputTask,
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
+ bool haveCredit();
public:
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index f0c7c1d0c6..6ec73fd47c 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -488,6 +488,39 @@ QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) {
BOOST_CHECK(!q.get(got));
}
+QPID_AUTO_TEST_CASE(testReliableDispatch) {
+ ClientSessionFixture fix;
+ std::string queue("a-queue");
+ fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true);
+
+ ConnectionSettings settings;
+ settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT);
+
+ Connection c1;
+ c1.open(settings);
+ Session s1 = c1.newSession();
+ SubscriptionManager subs1(s1);
+ LocalQueue q1;
+ subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit
+
+ Connection c2;
+ c2.open(settings);
+ Session s2 = c2.newSession();
+ SubscriptionManager subs2(s2);
+ LocalQueue q2;
+ subs2.subscribe(q2, queue);//second subscriber has credit
+
+ fix.session.messageTransfer(arg::content=Message("my-message", queue));
+
+ //check that the second consumer gets the message
+ Message got;
+ BOOST_CHECK(q2.get(got, 1*TIME_SEC));
+ BOOST_CHECK_EQUAL("my-message", got.getData());
+
+ c1.close();
+ c2.close();
+}
+
QPID_AUTO_TEST_SUITE_END()