summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2015-02-04 17:05:02 +0000
committerGordon Sim <gsim@apache.org>2015-02-04 17:05:02 +0000
commitd3fd2da6c424e04725c7b40f097ea31036dab0f1 (patch)
tree62b8a2ed95ad3eed54375a650adeb84ffe37650b /qpid/cpp/src
parentbafd7f6d88ade9062a69ec78e3c3f3c4e5a7fe7b (diff)
downloadqpid-python-d3fd2da6c424e04725c7b40f097ea31036dab0f1.tar.gz
QPID-6358: detect detached session on fetch
The previous solution may on occasion cause an early return in the case where one of many other receivers is closed while in fetch. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1657321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp25
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h3
2 files changed, 17 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index c13e0ef5e6..2ca2c85c64 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -164,12 +164,17 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
ScopedRelease release(inUse, lock);
sys::Mutex::ScopedUnlock l(lock);
//wait for suitable new message to arrive
- if (process(&handler, get_duration(timeout, deadline))) {
+ switch (process(&handler, get_duration(timeout, deadline))) {
+ case OK:
return true;
+ case CLOSED:
+ return false;
+ case EMPTY:
+ break;
}
}
if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
- } while (AbsTime::now() < deadline && !incoming->isClosed());
+ } while (AbsTime::now() < deadline);
return false;
}
namespace {
@@ -233,7 +238,7 @@ void IncomingMessages::releaseAll()
}
//then pump out any available messages from incoming queue...
GetAny handler;
- while (process(&handler, 0)) ;
+ while (process(&handler, 0) == OK) ;
//now release all messages
sys::Mutex::ScopedLock l(lock);
acceptTracker.release(session);
@@ -242,7 +247,7 @@ void IncomingMessages::releaseAll()
void IncomingMessages::releasePending(const std::string& destination)
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) ;
+ while (process(0, 0) == OK) ;
//now remove all messages for this destination from received list, recording their ids...
sys::Mutex::ScopedLock l(lock);
@@ -269,7 +274,7 @@ bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration ti
* that are not accepted by the handler are pushed onto received queue
* for later retrieval.
*/
-bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
@@ -282,7 +287,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
} else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
<< *content->getHeaders());
- return true;
+ return OK;
} else {
//received message for another destination, keep for later
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
@@ -295,8 +300,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
}
}
}
- catch (const qpid::ClosedException&) {} // Just return false if queue closed.
- return false;
+ catch (const qpid::ClosedException&) { return CLOSED; }
+ return EMPTY;
}
bool IncomingMessages::wait(qpid::sys::Duration duration)
@@ -331,7 +336,7 @@ uint32_t IncomingMessages::pendingAccept(const std::string& destination)
uint32_t IncomingMessages::available()
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) {}
+ while (process(0, 0) == OK) {}
//return the count of received messages
sys::Mutex::ScopedLock l(lock);
return received.size();
@@ -340,7 +345,7 @@ uint32_t IncomingMessages::available()
uint32_t IncomingMessages::available(const std::string& destination)
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) {}
+ while (process(0, 0) == OK) {}
//count all messages for this destination from received list
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index c9ea0673a3..4c9ee68ece 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -87,6 +87,7 @@ class IncomingMessages
uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
+ enum ProcessState {EMPTY=0,OK=1,CLOSED=2};
sys::Monitor lock;
qpid::client::AsyncSession session;
@@ -95,7 +96,7 @@ class IncomingMessages
FrameSetQueue received;
AcceptTracker acceptTracker;
- bool process(Handler*, qpid::sys::Duration);
+ ProcessState process(Handler*, qpid::sys::Duration);
bool wait(qpid::sys::Duration);
bool pop(FrameSetPtr&, qpid::sys::Duration);