summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp42
1 files changed, 29 insertions, 13 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 435459d97f..49cfec7497 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -37,6 +37,7 @@ using qpid::messaging::Duration;
void ReceiverImpl::received(qpid::messaging::Message&)
{
//TODO: should this be configurable
+ sys::Mutex::ScopedLock l(lock);
if (capacity && --window <= capacity/2) {
session.sendCompletion();
window = capacity;
@@ -78,14 +79,16 @@ void ReceiverImpl::close()
void ReceiverImpl::start()
{
+ sys::Mutex::ScopedLock l(lock);
if (state == STOPPED) {
state = STARTED;
- startFlow();
+ startFlow(l);
}
}
void ReceiverImpl::stop()
{
+ sys::Mutex::ScopedLock l(lock);
state = STOPPED;
session.messageStop(destination);
}
@@ -95,7 +98,7 @@ void ReceiverImpl::setCapacity(uint32_t c)
execute1<SetCapacity>(c);
}
-void ReceiverImpl::startFlow()
+void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&)
{
if (capacity > 0) {
session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
@@ -107,10 +110,11 @@ void ReceiverImpl::startFlow()
void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
-
+ sys::Mutex::ScopedLock l(lock);
session = s;
if (state == UNRESOLVED) {
source = resolver.resolveSource(session, address);
+ assert(source.get());
state = STARTED;
}
if (state == CANCELLED) {
@@ -118,15 +122,19 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve
parent->receiverCancelled(destination);
} else {
source->subscribe(session, destination);
- startFlow();
+ startFlow(l);
}
}
-const std::string& ReceiverImpl::getName() const { return destination; }
+const std::string& ReceiverImpl::getName() const {
+ sys::Mutex::ScopedLock l(lock);
+ return destination;
+}
uint32_t ReceiverImpl::getCapacity()
{
+ sys::Mutex::ScopedLock l(lock);
return capacity;
}
@@ -153,25 +161,31 @@ bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::D
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
- if (state == CANCELLED) return false;//TODO: or should this be an error?
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false;//TODO: or should this be an error?
- if (capacity == 0 || state != STARTED) {
- session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
- session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
- session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
}
-
if (getImpl(message, timeout)) {
return true;
} else {
sync(session).messageFlush(destination);
- startFlow();//reallocate credit
+ {
+ sys::Mutex::ScopedLock l(lock);
+ startFlow(l); //reallocate credit
+ }
return getImpl(message, Duration::IMMEDIATE);
}
}
void ReceiverImpl::closeImpl()
{
+ sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
source->cancel(session, destination);
@@ -181,14 +195,16 @@ void ReceiverImpl::closeImpl()
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
+ sys::Mutex::ScopedLock l(lock);
if (c != capacity) {
capacity = c;
if (state == STARTED) {
session.messageStop(destination);
- startFlow();
+ startFlow(l);
}
}
}
+
qpid::messaging::Session ReceiverImpl::getSession() const
{
return qpid::messaging::Session(parent.get());