summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-25 10:16:22 +0000
committerGordon Sim <gsim@apache.org>2008-07-25 10:16:22 +0000
commit9a3c24de4c9f18a068be36e50fdbf44589499787 (patch)
tree1bc22c77df4c9c8b3eaeac65e4e3bca9e93fe532
parent0ba523ca249b8f7f46d3c113b991cf880ce9eb9a (diff)
downloadqpid-python-9a3c24de4c9f18a068be36e50fdbf44589499787.tar.gz
Fixed bug in SubscriptionManager::get() where flush was issued before waiting and if message showed up after flush completed but before wait was finished there was no credit (due to flush) to deliver it to the waiting client. Added test for thise case.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@679739 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/SubscriptionManager.cpp5
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp20
2 files changed, 24 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
index 9bb75f9a49..b4c48f7365 100644
--- a/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -134,8 +134,11 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du
std::string unique = framing::Uuid(true).str();
subscribe(lq, queue, FlowControl::messageCredit(1), unique);
AutoCancel ac(*this, unique);
+ //first wait for message to be delivered if a timeout has been specified
+ if (timeout && lq.get(result, timeout)) return true;
+ //make sure message is not on queue before final check
sync(session).messageFlush(unique);
- return lq.get(result, timeout);
+ return lq.get(result, 0);
}
}} // namespace qpid::client
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 90616cf7f3..3d9280211a 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -41,6 +41,7 @@ using namespace qpid::client::arg;
using namespace qpid::framing;
using namespace qpid;
using qpid::sys::Monitor;
+using qpid::sys::Thread;
using qpid::sys::TIME_SEC;
using std::string;
using std::cout;
@@ -238,6 +239,19 @@ QPID_AUTO_TEST_CASE(testLocalQueue) {
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}
+struct DelayedTransfer : sys::Runnable
+{
+ ClientSessionFixture& fixture;
+
+ DelayedTransfer(ClientSessionFixture& f) : fixture(f) {}
+
+ void run()
+ {
+ sleep(1);
+ fixture.session.messageTransfer(content=Message("foo2", "getq"));
+ }
+};
+
QPID_AUTO_TEST_CASE(testGet) {
ClientSessionFixture fix;
fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
@@ -249,6 +263,12 @@ QPID_AUTO_TEST_CASE(testGet) {
BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
BOOST_CHECK_EQUAL("foo1", got.getData());
BOOST_CHECK(!fix.subs.get(got, "getq"));
+ DelayedTransfer sender(fix);
+ Thread t(sender);
+ //test timed get where message shows up after a short delay
+ BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC));
+ BOOST_CHECK_EQUAL("foo2", got.getData());
+ t.join();
}
QPID_AUTO_TEST_CASE(testOpenFailure) {