summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp44
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h5
2 files changed, 38 insertions, 11 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 4c41622751..e1b75ec0cf 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -36,7 +36,7 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
void SenderImpl::send(const qpid::messaging::Message& message, bool sync)
{
- if (unreliable) {
+ if (unreliable) { // immutable, don't need lock
UnreliableSend f(*this, &message);
parent->execute(f);
} else {
@@ -53,11 +53,20 @@ void SenderImpl::close()
void SenderImpl::setCapacity(uint32_t c)
{
- bool flush = c < capacity;
- capacity = c;
+ bool flush;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ flush = c < capacity;
+ capacity = c;
+ }
execute1<CheckPendingSends>(flush);
}
-uint32_t SenderImpl::getCapacity() { return capacity; }
+
+uint32_t SenderImpl::getCapacity() {
+ sys::Mutex::ScopedLock l(lock);
+ return capacity;
+}
+
uint32_t SenderImpl::getUnsettled()
{
CheckPendingSends f(*this, false);
@@ -67,6 +76,7 @@ uint32_t SenderImpl::getUnsettled()
void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
+ sys::Mutex::ScopedLock l(lock);
session = s;
if (state == UNRESOLVED) {
sink = resolver.resolveSink(session, address);
@@ -74,33 +84,38 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
}
if (state == CANCELLED) {
sink->cancel(session, name);
+ sys::Mutex::ScopedUnlock u(lock);
parent->senderCancelled(name);
} else {
sink->declare(session, name);
- replay();
+ replay(l);
}
}
void SenderImpl::waitForCapacity()
{
+ sys::Mutex::ScopedLock l(lock);
//TODO: add option to throw exception rather than blocking?
- if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ if (!unreliable && capacity <=
+ (flushed ? checkPendingSends(false, l) : outgoing.size()))
+ {
//Initial implementation is very basic. As outgoing is
//currently only reduced on receiving completions and we are
//blocking anyway we may as well sync(). If successful that
//should clear all outstanding sends.
session.sync();
- checkPendingSends(false);
+ checkPendingSends(false, l);
}
//flush periodically and check for conmpleted sends
if (++window > (capacity / 4)) {//TODO: make this configurable?
- checkPendingSends(true);
+ checkPendingSends(true, l);
window = 0;
}
}
void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
+ sys::Mutex::ScopedLock l(lock);
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
@@ -110,20 +125,26 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m)
void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
{
+ sys::Mutex::ScopedLock l(lock);
OutgoingMessage msg;
msg.convert(m);
msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
sink->send(session, name, msg);
}
-void SenderImpl::replay()
+void SenderImpl::replay(const sys::Mutex::ScopedLock&)
{
for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
sink->send(session, name, *i);
}
}
-uint32_t SenderImpl::checkPendingSends(bool flush)
+uint32_t SenderImpl::checkPendingSends(bool flush) {
+ sys::Mutex::ScopedLock l(lock);
+ return checkPendingSends(flush, l);
+}
+
+uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&)
{
if (flush) {
session.flush();
@@ -139,6 +160,7 @@ uint32_t SenderImpl::checkPendingSends(bool flush)
void SenderImpl::closeImpl()
{
+ sys::Mutex::ScopedLock l(lock);
state = CANCELLED;
sink->cancel(session, name);
parent->senderCancelled(name);
@@ -146,11 +168,13 @@ void SenderImpl::closeImpl()
const std::string& SenderImpl::getName() const
{
+ sys::Mutex::ScopedLock l(lock);
return name;
}
qpid::messaging::Session SenderImpl::getSession() const
{
+ sys::Mutex::ScopedLock l(lock);
return qpid::messaging::Session(parent.get());
}
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 826c734697..c10c77ae18 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -58,6 +58,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
qpid::messaging::Session getSession() const;
private:
+ mutable sys::Mutex lock;
boost::intrusive_ptr<SessionImpl> parent;
const std::string name;
const qpid::messaging::Address address;
@@ -76,7 +77,9 @@ class SenderImpl : public qpid::messaging::SenderImpl
const bool unreliable;
uint32_t checkPendingSends(bool flush);
- void replay();
+ // Dummy ScopedLock parameter means call with lock held
+ uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&);
+ void replay(const sys::Mutex::ScopedLock&);
void waitForCapacity();
//logic for application visible methods: