diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 4ba793d71c..028d26bda7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -51,6 +51,9 @@ class SenderImpl : public qpid::messaging::SenderImpl const qpid::messaging::Variant::Map& options); void send(const qpid::messaging::Message&); void cancel(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t pending(); void init(qpid::client::AsyncSession, AddressResolution&); private: @@ -69,14 +72,17 @@ class SenderImpl : public qpid::messaging::SenderImpl OutgoingMessages outgoing; uint32_t capacity; uint32_t window; + bool flushed; - void checkPendingSends(); + uint32_t checkPendingSends(bool flush); void replay(); + void waitForCapacity(); //logic for application visible methods: void sendImpl(const qpid::messaging::Message&); void cancelImpl(); + //functors for application visible methods (allowing locking and //retry to be centralised): struct Command @@ -89,9 +95,17 @@ class SenderImpl : public qpid::messaging::SenderImpl struct Send : Command { const qpid::messaging::Message* message; - - Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} - void operator()() { impl.sendImpl(*message); } + bool repeat; + + Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {} + void operator()() + { + impl.waitForCapacity(); + //from this point message will be recorded if there is any + //failure (and replayed) so need not repeat the call + repeat = false; + impl.sendImpl(*message); + } }; struct Cancel : Command @@ -100,6 +114,14 @@ class SenderImpl : public qpid::messaging::SenderImpl void operator()() { impl.cancelImpl(); } }; + struct CheckPendingSends : Command + { + bool flush; + uint32_t pending; + CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {} + void operator()() { pending = impl.checkPendingSends(flush); } + }; + //helper templates for some common patterns template <class F> void execute() { @@ -107,10 +129,10 @@ class SenderImpl : public qpid::messaging::SenderImpl parent.execute(f); } - template <class F, class P> void execute1(P p) + template <class F, class P> bool execute1(P p) { F f(*this, p); - parent.execute(f); + return parent.execute(f); } }; }}} // namespace qpid::client::amqp0_10 |