summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp11
1 files changed, 6 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 6036f153f6..3fa75a54ac 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -42,7 +42,7 @@ SubscriptionManager::SubscriptionManager(const Session& s)
void SubscriptionManager::subscribeInternal(
const std::string& q, const std::string& dest)
{
- async(session).messageSubscribe( // setFlowControl will sync.
+ session.messageSubscribe(
arg::queue=q, arg::destination=dest,
arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
setFlowControl(dest, messages, bytes, window);
@@ -68,9 +68,10 @@ void SubscriptionManager::subscribe(
void SubscriptionManager::setFlowControl(
const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- async(session).messageSetFlowMode(dest, window);
- async(session).messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes); // Only need one sync
+ session.messageSetFlowMode(dest, window);
+ session.messageFlow(dest, 0, messages);
+ session.messageFlow(dest, 1, bytes);
+ session.sync();
}
void SubscriptionManager::setFlowControl(
@@ -91,8 +92,8 @@ AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
void SubscriptionManager::cancel(const std::string dest)
{
+ sync(session).messageCancel(dest);
dispatcher.cancel(dest);
- session.messageCancel(dest);
}
void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }