diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 100 |
1 files changed, 92 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index ac36eb1537..4cd2dc0521 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -21,29 +21,113 @@ #include "SenderImpl.h" #include "MessageSink.h" #include "SessionImpl.h" +#include "AddressResolution.h" +#include "OutgoingMessage.h" namespace qpid { namespace client { namespace amqp0_10 { -SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, std::auto_ptr<MessageSink> _sink) : - parent(_parent), name(_name), sink(_sink) {} +SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, + const qpid::messaging::Address& _address, + const qpid::messaging::Variant::Map& _options) : + parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), + capacity(50), window(0), flushed(false) {} -void SenderImpl::send(qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& message) { - sink->send(session, name, m); + Send f(*this, &message); + while (f.repeat) parent.execute(f); } void SenderImpl::cancel() { - sink->cancel(session, name); - parent.senderCancelled(name); + execute<Cancel>(); +} + +void SenderImpl::setCapacity(uint32_t c) +{ + bool flush = c < capacity; + capacity = c; + execute1<CheckPendingSends>(flush); } +uint32_t SenderImpl::getCapacity() { return capacity; } +uint32_t SenderImpl::pending() +{ + CheckPendingSends f(*this, false); + parent.execute(f); + return f.pending; +} -void SenderImpl::setSession(qpid::client::AsyncSession s) +void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { session = s; - sink->declare(session, name); + if (state == UNRESOLVED) { + sink = resolver.resolveSink(session, address, options); + state = ACTIVE; + } + if (state == CANCELLED) { + sink->cancel(session, name); + parent.senderCancelled(name); + } else { + sink->declare(session, name); + replay(); + } +} + +void SenderImpl::waitForCapacity() +{ + //TODO: add option to throw exception rather than blocking? + if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + //Initial implementation is very basic. As outgoing is + //currently only reduced on receiving completions and we are + //blocking anyway we may as well sync(). If successful that + //should clear all outstanding sends. + session.sync(); + checkPendingSends(false); + } + //flush periodically and check for conmpleted sends + if (++window > (capacity / 4)) {//TODO: make this configurable? + checkPendingSends(true); + window = 0; + } +} + +void SenderImpl::sendImpl(const qpid::messaging::Message& m) +{ + //TODO: make recording for replay optional (would still want to track completion however) + std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); + msg->convert(m); + outgoing.push_back(msg.release()); + sink->send(session, name, outgoing.back()); +} + +void SenderImpl::replay() +{ + for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + sink->send(session, name, *i); + } +} + +uint32_t SenderImpl::checkPendingSends(bool flush) +{ + if (flush) { + session.flush(); + flushed = true; + } else { + flushed = false; + } + while (!outgoing.empty() && outgoing.front().status.isComplete()) { + outgoing.pop_front(); + } + return outgoing.size(); +} + +void SenderImpl::cancelImpl() +{ + state = CANCELLED; + sink->cancel(session, name); + parent.senderCancelled(name); } }}} // namespace qpid::client::amqp0_10 |