diff options
| author | Gordon Sim <gsim@apache.org> | 2015-02-04 17:05:02 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2015-02-04 17:05:02 +0000 |
| commit | d3fd2da6c424e04725c7b40f097ea31036dab0f1 (patch) | |
| tree | 62b8a2ed95ad3eed54375a650adeb84ffe37650b /qpid/cpp/src | |
| parent | bafd7f6d88ade9062a69ec78e3c3f3c4e5a7fe7b (diff) | |
| download | qpid-python-d3fd2da6c424e04725c7b40f097ea31036dab0f1.tar.gz | |
QPID-6358: detect detached session on fetch
The previous solution may on occasion cause an early return in the case where one of many other receivers is closed while in fetch.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1657321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 3 |
2 files changed, 17 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index c13e0ef5e6..2ca2c85c64 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -164,12 +164,17 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) ScopedRelease release(inUse, lock); sys::Mutex::ScopedUnlock l(lock); //wait for suitable new message to arrive - if (process(&handler, get_duration(timeout, deadline))) { + switch (process(&handler, get_duration(timeout, deadline))) { + case OK: return true; + case CLOSED: + return false; + case EMPTY: + break; } } if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed"); - } while (AbsTime::now() < deadline && !incoming->isClosed()); + } while (AbsTime::now() < deadline); return false; } namespace { @@ -233,7 +238,7 @@ void IncomingMessages::releaseAll() } //then pump out any available messages from incoming queue... GetAny handler; - while (process(&handler, 0)) ; + while (process(&handler, 0) == OK) ; //now release all messages sys::Mutex::ScopedLock l(lock); acceptTracker.release(session); @@ -242,7 +247,7 @@ void IncomingMessages::releaseAll() void IncomingMessages::releasePending(const std::string& destination) { //first pump all available messages from incoming to received... - while (process(0, 0)) ; + while (process(0, 0) == OK) ; //now remove all messages for this destination from received list, recording their ids... sys::Mutex::ScopedLock l(lock); @@ -269,7 +274,7 @@ bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration ti * that are not accepted by the handler are pushed onto received queue * for later retrieval. */ -bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) +IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) { AbsTime deadline(AbsTime::now(), duration); FrameSet::shared_ptr content; @@ -282,7 +287,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) } else if (handler && handler->accept(transfer)) { QPID_LOG(debug, "Delivered " << *content->getMethod() << " " << *content->getHeaders()); - return true; + return OK; } else { //received message for another destination, keep for later QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); @@ -295,8 +300,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) } } } - catch (const qpid::ClosedException&) {} // Just return false if queue closed. - return false; + catch (const qpid::ClosedException&) { return CLOSED; } + return EMPTY; } bool IncomingMessages::wait(qpid::sys::Duration duration) @@ -331,7 +336,7 @@ uint32_t IncomingMessages::pendingAccept(const std::string& destination) uint32_t IncomingMessages::available() { //first pump all available messages from incoming to received... - while (process(0, 0)) {} + while (process(0, 0) == OK) {} //return the count of received messages sys::Mutex::ScopedLock l(lock); return received.size(); @@ -340,7 +345,7 @@ uint32_t IncomingMessages::available() uint32_t IncomingMessages::available(const std::string& destination) { //first pump all available messages from incoming to received... - while (process(0, 0)) {} + while (process(0, 0) == OK) {} //count all messages for this destination from received list sys::Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index c9ea0673a3..4c9ee68ece 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -87,6 +87,7 @@ class IncomingMessages uint32_t available(const std::string& destination); private: typedef std::deque<FrameSetPtr> FrameSetQueue; + enum ProcessState {EMPTY=0,OK=1,CLOSED=2}; sys::Monitor lock; qpid::client::AsyncSession session; @@ -95,7 +96,7 @@ class IncomingMessages FrameSetQueue received; AcceptTracker acceptTracker; - bool process(Handler*, qpid::sys::Duration); + ProcessState process(Handler*, qpid::sys::Duration); bool wait(qpid::sys::Duration); bool pop(FrameSetPtr&, qpid::sys::Duration); |
