diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 55 |
1 files changed, 45 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index c619d1226a..4cd2dc0521 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -32,11 +32,12 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address, const qpid::messaging::Variant::Map& _options) : parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), - capacity(50), window(0) {} + capacity(50), window(0), flushed(false) {} -void SenderImpl::send(const qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& message) { - execute1<Send>(&m); + Send f(*this, &message); + while (f.repeat) parent.execute(f); } void SenderImpl::cancel() @@ -44,6 +45,20 @@ void SenderImpl::cancel() execute<Cancel>(); } +void SenderImpl::setCapacity(uint32_t c) +{ + bool flush = c < capacity; + capacity = c; + execute1<CheckPendingSends>(flush); +} +uint32_t SenderImpl::getCapacity() { return capacity; } +uint32_t SenderImpl::pending() +{ + CheckPendingSends f(*this, false); + parent.execute(f); + return f.pending; +} + void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { session = s; @@ -60,18 +75,31 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) } } +void SenderImpl::waitForCapacity() +{ + //TODO: add option to throw exception rather than blocking? + if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + //Initial implementation is very basic. As outgoing is + //currently only reduced on receiving completions and we are + //blocking anyway we may as well sync(). If successful that + //should clear all outstanding sends. + session.sync(); + checkPendingSends(false); + } + //flush periodically and check for conmpleted sends + if (++window > (capacity / 4)) {//TODO: make this configurable? + checkPendingSends(true); + window = 0; + } +} + void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: make recoding for replay optional + //TODO: make recording for replay optional (would still want to track completion however) std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); outgoing.push_back(msg.release()); sink->send(session, name, outgoing.back()); - if (++window > (capacity / 2)) {//TODO: make this configurable? - session.flush(); - checkPendingSends(); - window = 0; - } } void SenderImpl::replay() @@ -81,11 +109,18 @@ void SenderImpl::replay() } } -void SenderImpl::checkPendingSends() +uint32_t SenderImpl::checkPendingSends(bool flush) { + if (flush) { + session.flush(); + flushed = true; + } else { + flushed = false; + } while (!outgoing.empty() && outgoing.front().status.isComplete()) { outgoing.pop_front(); } + return outgoing.size(); } void SenderImpl::cancelImpl() |