diff options
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 61 |
1 files changed, 29 insertions, 32 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index bf5191e8a0..fc65843643 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -33,57 +33,54 @@ namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(Session_0_10& s) - : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true) + : dispatcher(s), session(s), + messages(UNLIMITED), bytes(UNLIMITED), window(true) {} -std::string SubscriptionManager::uniqueTag(const std::string& tag) { - // Make unique tag. - int count=1; - std::string unique=tag; - while (subscriptions.find(tag) != subscriptions.end()) { - std::ostringstream s; - s << tag << "-" << count++; - unique=s.str(); - } - subscriptions.insert(unique); - return tag; -} - -std::string SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& t) { - std::string tag=uniqueTag(t); - using namespace arg; + std::string tag=t.empty() ? q:t; session.messageSubscribe(arg::queue=q, arg::destination=tag); - flowLimits(tag, messages, bytes); dispatcher.listen(tag, &listener); - return tag; + setFlowControl(tag, messages, bytes, window); +} + +void SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const std::string& t) +{ + std::string tag=t.empty() ? q:t; + lq.session=session; + lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag)); + session.messageSubscribe(arg::queue=q, arg::destination=tag); + setFlowControl(tag, messages, bytes, window); } -void SubscriptionManager::flowLimits( - const std::string& tag, uint32_t messages, uint32_t bytes) { +void SubscriptionManager::setFlowControl( + const std::string& tag, uint32_t messages, uint32_t bytes, bool window) +{ + session.messageFlowMode(tag, window); session.messageFlow(tag, 0, messages); session.messageFlow(tag, 1, bytes); } -void SubscriptionManager::flowLimits(uint32_t m, uint32_t b) { - messages=m; - bytes=b; +void SubscriptionManager::setFlowControl( + uint32_t messages_, uint32_t bytes_, bool window_) +{ + messages=messages_; + bytes=bytes_; + window=window_; } void SubscriptionManager::cancel(const std::string tag) { - if (subscriptions.erase(tag)) { - dispatcher.cancel(tag); - session.messageCancel(tag); - if (autoStop && subscriptions.empty()) stop(); - } + dispatcher.cancel(tag); + session.messageCancel(tag); } -void SubscriptionManager::run(bool autoStop_) +void SubscriptionManager::run(bool autoStop) { - autoStop=autoStop_; - if (autoStop && subscriptions.empty()) return; + dispatcher.setAutoStop(autoStop); dispatcher.run(); } |