summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h34
1 files changed, 28 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 4ba793d71c..028d26bda7 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -51,6 +51,9 @@ class SenderImpl : public qpid::messaging::SenderImpl
const qpid::messaging::Variant::Map& options);
void send(const qpid::messaging::Message&);
void cancel();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t pending();
void init(qpid::client::AsyncSession, AddressResolution&);
private:
@@ -69,14 +72,17 @@ class SenderImpl : public qpid::messaging::SenderImpl
OutgoingMessages outgoing;
uint32_t capacity;
uint32_t window;
+ bool flushed;
- void checkPendingSends();
+ uint32_t checkPendingSends(bool flush);
void replay();
+ void waitForCapacity();
//logic for application visible methods:
void sendImpl(const qpid::messaging::Message&);
void cancelImpl();
+
//functors for application visible methods (allowing locking and
//retry to be centralised):
struct Command
@@ -89,9 +95,17 @@ class SenderImpl : public qpid::messaging::SenderImpl
struct Send : Command
{
const qpid::messaging::Message* message;
-
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
- void operator()() { impl.sendImpl(*message); }
+ bool repeat;
+
+ Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ void operator()()
+ {
+ impl.waitForCapacity();
+ //from this point message will be recorded if there is any
+ //failure (and replayed) so need not repeat the call
+ repeat = false;
+ impl.sendImpl(*message);
+ }
};
struct Cancel : Command
@@ -100,6 +114,14 @@ class SenderImpl : public qpid::messaging::SenderImpl
void operator()() { impl.cancelImpl(); }
};
+ struct CheckPendingSends : Command
+ {
+ bool flush;
+ uint32_t pending;
+ CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
+ void operator()() { pending = impl.checkPendingSends(flush); }
+ };
+
//helper templates for some common patterns
template <class F> void execute()
{
@@ -107,10 +129,10 @@ class SenderImpl : public qpid::messaging::SenderImpl
parent.execute(f);
}
- template <class F, class P> void execute1(P p)
+ template <class F, class P> bool execute1(P p)
{
F f(*this, p);
- parent.execute(f);
+ return parent.execute(f);
}
};
}}} // namespace qpid::client::amqp0_10