diff options
author | Alan Conway <aconway@apache.org> | 2010-05-18 18:47:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-05-18 18:47:25 +0000 |
commit | 2505926ad6b3a55b486bbcef07196fc844fc5aaa (patch) | |
tree | 4288d5ddaed4eaab8e591bf4bd5f96c6874cc94a /cpp/src | |
parent | 1a8b0fe6cb9731f650c02478a2b74fc10dc0932c (diff) | |
download | qpid-python-2505926ad6b3a55b486bbcef07196fc844fc5aaa.tar.gz |
Added locking to make amqp_0_10::SenderImpl thread safe.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 44 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 5 |
2 files changed, 38 insertions, 11 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 4c41622751..e1b75ec0cf 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -36,7 +36,7 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, void SenderImpl::send(const qpid::messaging::Message& message, bool sync) { - if (unreliable) { + if (unreliable) { // immutable, don't need lock UnreliableSend f(*this, &message); parent->execute(f); } else { @@ -53,11 +53,20 @@ void SenderImpl::close() void SenderImpl::setCapacity(uint32_t c) { - bool flush = c < capacity; - capacity = c; + bool flush; + { + sys::Mutex::ScopedLock l(lock); + flush = c < capacity; + capacity = c; + } execute1<CheckPendingSends>(flush); } -uint32_t SenderImpl::getCapacity() { return capacity; } + +uint32_t SenderImpl::getCapacity() { + sys::Mutex::ScopedLock l(lock); + return capacity; +} + uint32_t SenderImpl::getUnsettled() { CheckPendingSends f(*this, false); @@ -67,6 +76,7 @@ uint32_t SenderImpl::getUnsettled() void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { + sys::Mutex::ScopedLock l(lock); session = s; if (state == UNRESOLVED) { sink = resolver.resolveSink(session, address); @@ -74,33 +84,38 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) } if (state == CANCELLED) { sink->cancel(session, name); + sys::Mutex::ScopedUnlock u(lock); parent->senderCancelled(name); } else { sink->declare(session, name); - replay(); + replay(l); } } void SenderImpl::waitForCapacity() { + sys::Mutex::ScopedLock l(lock); //TODO: add option to throw exception rather than blocking? - if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + if (!unreliable && capacity <= + (flushed ? checkPendingSends(false, l) : outgoing.size())) + { //Initial implementation is very basic. As outgoing is //currently only reduced on receiving completions and we are //blocking anyway we may as well sync(). If successful that //should clear all outstanding sends. session.sync(); - checkPendingSends(false); + checkPendingSends(false, l); } //flush periodically and check for conmpleted sends if (++window > (capacity / 4)) {//TODO: make this configurable? - checkPendingSends(true); + checkPendingSends(true, l); window = 0; } } void SenderImpl::sendImpl(const qpid::messaging::Message& m) { + sys::Mutex::ScopedLock l(lock); std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); @@ -110,20 +125,26 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m) void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) { + sys::Mutex::ScopedLock l(lock); OutgoingMessage msg; msg.convert(m); msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); sink->send(session, name, msg); } -void SenderImpl::replay() +void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { sink->send(session, name, *i); } } -uint32_t SenderImpl::checkPendingSends(bool flush) +uint32_t SenderImpl::checkPendingSends(bool flush) { + sys::Mutex::ScopedLock l(lock); + return checkPendingSends(flush, l); +} + +uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) { if (flush) { session.flush(); @@ -139,6 +160,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush) void SenderImpl::closeImpl() { + sys::Mutex::ScopedLock l(lock); state = CANCELLED; sink->cancel(session, name); parent->senderCancelled(name); @@ -146,11 +168,13 @@ void SenderImpl::closeImpl() const std::string& SenderImpl::getName() const { + sys::Mutex::ScopedLock l(lock); return name; } qpid::messaging::Session SenderImpl::getSession() const { + sys::Mutex::ScopedLock l(lock); return qpid::messaging::Session(parent.get()); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 826c734697..c10c77ae18 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -58,6 +58,7 @@ class SenderImpl : public qpid::messaging::SenderImpl qpid::messaging::Session getSession() const; private: + mutable sys::Mutex lock; boost::intrusive_ptr<SessionImpl> parent; const std::string name; const qpid::messaging::Address address; @@ -76,7 +77,9 @@ class SenderImpl : public qpid::messaging::SenderImpl const bool unreliable; uint32_t checkPendingSends(bool flush); - void replay(); + // Dummy ScopedLock parameter means call with lock held + uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&); + void replay(const sys::Mutex::ScopedLock&); void waitForCapacity(); //logic for application visible methods: |