diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 42 |
1 files changed, 29 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 435459d97f..49cfec7497 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -37,6 +37,7 @@ using qpid::messaging::Duration; void ReceiverImpl::received(qpid::messaging::Message&) { //TODO: should this be configurable + sys::Mutex::ScopedLock l(lock); if (capacity && --window <= capacity/2) { session.sendCompletion(); window = capacity; @@ -78,14 +79,16 @@ void ReceiverImpl::close() void ReceiverImpl::start() { + sys::Mutex::ScopedLock l(lock); if (state == STOPPED) { state = STARTED; - startFlow(); + startFlow(l); } } void ReceiverImpl::stop() { + sys::Mutex::ScopedLock l(lock); state = STOPPED; session.messageStop(destination); } @@ -95,7 +98,7 @@ void ReceiverImpl::setCapacity(uint32_t c) execute1<SetCapacity>(c); } -void ReceiverImpl::startFlow() +void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&) { if (capacity > 0) { session.messageSetFlowMode(destination, FLOW_MODE_WINDOW); @@ -107,10 +110,11 @@ void ReceiverImpl::startFlow() void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { - + sys::Mutex::ScopedLock l(lock); session = s; if (state == UNRESOLVED) { source = resolver.resolveSource(session, address); + assert(source.get()); state = STARTED; } if (state == CANCELLED) { @@ -118,15 +122,19 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve parent->receiverCancelled(destination); } else { source->subscribe(session, destination); - startFlow(); + startFlow(l); } } -const std::string& ReceiverImpl::getName() const { return destination; } +const std::string& ReceiverImpl::getName() const { + sys::Mutex::ScopedLock l(lock); + return destination; +} uint32_t ReceiverImpl::getCapacity() { + sys::Mutex::ScopedLock l(lock); return capacity; } @@ -153,25 +161,31 @@ bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::D bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { - if (state == CANCELLED) return false;//TODO: or should this be an error? + { + sys::Mutex::ScopedLock l(lock); + if (state == CANCELLED) return false;//TODO: or should this be an error? - if (capacity == 0 || state != STARTED) { - session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); - session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); - session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + if (capacity == 0 || state != STARTED) { + session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + } } - if (getImpl(message, timeout)) { return true; } else { sync(session).messageFlush(destination); - startFlow();//reallocate credit + { + sys::Mutex::ScopedLock l(lock); + startFlow(l); //reallocate credit + } return getImpl(message, Duration::IMMEDIATE); } } void ReceiverImpl::closeImpl() { + sys::Mutex::ScopedLock l(lock); if (state != CANCELLED) { state = CANCELLED; source->cancel(session, destination); @@ -181,14 +195,16 @@ void ReceiverImpl::closeImpl() void ReceiverImpl::setCapacityImpl(uint32_t c) { + sys::Mutex::ScopedLock l(lock); if (c != capacity) { capacity = c; if (state == STARTED) { session.messageStop(destination); - startFlow(); + startFlow(l); } } } + qpid::messaging::Session ReceiverImpl::getSession() const { return qpid::messaging::Session(parent.get()); |