summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp48
1 files changed, 30 insertions, 18 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 478b8438c2..1282a1cf61 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -34,34 +34,42 @@ namespace client {
SubscriptionManager::SubscriptionManager(Session_0_10& s)
: dispatcher(s), session(s),
- messages(UNLIMITED), bytes(UNLIMITED), window(true)
+ messages(UNLIMITED), bytes(UNLIMITED), window(true),
+ confirmMode(true)
{}
+void SubscriptionManager::subscribeInternal(
+ const std::string& q, const std::string& dest)
+{
+ session.messageSubscribe(arg::queue=q, arg::destination=dest,
+ arg::confirmMode=confirmMode);
+ setFlowControl(dest, messages, bytes, window);
+}
+
void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& t)
+ MessageListener& listener, const std::string& q, const std::string& d)
{
- std::string tag=t.empty() ? q:t;
- dispatcher.listen(tag, &listener);
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
- setFlowControl(tag, messages, bytes, window);
+ std::string dest=d.empty() ? q:d;
+ dispatcher.listen(dest, &listener, autoAck);
+ subscribeInternal(q, dest);
}
void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& t)
+ LocalQueue& lq, const std::string& q, const std::string& d)
{
- std::string tag=t.empty() ? q:t;
+ std::string dest=d.empty() ? q:d;
lq.session=session;
- lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
- setFlowControl(tag, messages, bytes, window);
+ lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
+ lq.setAckPolicy(autoAck);
+ subscribeInternal(q, dest);
}
void SubscriptionManager::setFlowControl(
- const std::string& tag, uint32_t messages, uint32_t bytes, bool window)
+ const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- session.messageFlowMode(tag, window);
- session.messageFlow(tag, 0, messages);
- session.messageFlow(tag, 1, bytes);
+ session.messageFlowMode(dest, window);
+ session.messageFlow(dest, 0, messages);
+ session.messageFlow(dest, 1, bytes);
}
void SubscriptionManager::setFlowControl(
@@ -72,10 +80,14 @@ void SubscriptionManager::setFlowControl(
window=window_;
}
-void SubscriptionManager::cancel(const std::string tag)
+void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; }
+
+void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+
+void SubscriptionManager::cancel(const std::string dest)
{
- dispatcher.cancel(tag);
- session.messageCancel(tag);
+ dispatcher.cancel(dest);
+ session.messageCancel(dest);
}
void SubscriptionManager::run(bool autoStop)