summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-07 19:57:46 +0000
committerAlan Conway <aconway@apache.org>2007-11-07 19:57:46 +0000
commit710b8a1f1285b9aa5bccee5b1906500667dd7bc5 (patch)
tree83005778c44cf7d897cef882ced2330bc8bd2228 /cpp/src/qpid/client/SubscriptionManager.cpp
parentd19657d82321b2b5e2cac386c49aa99f82b976fb (diff)
downloadqpid-python-710b8a1f1285b9aa5bccee5b1906500667dd7bc5.tar.gz
client::SubscriptionManager:
- Added autoStop support. - Added LocalQueue subscriptions. - Expose AckPolicy settings to user. client::Message: - incoming Messages carry their session for acknowledge perftest: (see perftest --help for details...) - allow multiple consumers. - 3 queue modes: shared, fanout, topic. - set size of messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp61
1 files changed, 29 insertions, 32 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index bf5191e8a0..fc65843643 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -33,57 +33,54 @@ namespace qpid {
namespace client {
SubscriptionManager::SubscriptionManager(Session_0_10& s)
- : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true)
+ : dispatcher(s), session(s),
+ messages(UNLIMITED), bytes(UNLIMITED), window(true)
{}
-std::string SubscriptionManager::uniqueTag(const std::string& tag) {
- // Make unique tag.
- int count=1;
- std::string unique=tag;
- while (subscriptions.find(tag) != subscriptions.end()) {
- std::ostringstream s;
- s << tag << "-" << count++;
- unique=s.str();
- }
- subscriptions.insert(unique);
- return tag;
-}
-
-std::string SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& t)
{
- std::string tag=uniqueTag(t);
- using namespace arg;
+ std::string tag=t.empty() ? q:t;
session.messageSubscribe(arg::queue=q, arg::destination=tag);
- flowLimits(tag, messages, bytes);
dispatcher.listen(tag, &listener);
- return tag;
+ setFlowControl(tag, messages, bytes, window);
+}
+
+void SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& t)
+{
+ std::string tag=t.empty() ? q:t;
+ lq.session=session;
+ lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
+ session.messageSubscribe(arg::queue=q, arg::destination=tag);
+ setFlowControl(tag, messages, bytes, window);
}
-void SubscriptionManager::flowLimits(
- const std::string& tag, uint32_t messages, uint32_t bytes) {
+void SubscriptionManager::setFlowControl(
+ const std::string& tag, uint32_t messages, uint32_t bytes, bool window)
+{
+ session.messageFlowMode(tag, window);
session.messageFlow(tag, 0, messages);
session.messageFlow(tag, 1, bytes);
}
-void SubscriptionManager::flowLimits(uint32_t m, uint32_t b) {
- messages=m;
- bytes=b;
+void SubscriptionManager::setFlowControl(
+ uint32_t messages_, uint32_t bytes_, bool window_)
+{
+ messages=messages_;
+ bytes=bytes_;
+ window=window_;
}
void SubscriptionManager::cancel(const std::string tag)
{
- if (subscriptions.erase(tag)) {
- dispatcher.cancel(tag);
- session.messageCancel(tag);
- if (autoStop && subscriptions.empty()) stop();
- }
+ dispatcher.cancel(tag);
+ session.messageCancel(tag);
}
-void SubscriptionManager::run(bool autoStop_)
+void SubscriptionManager::run(bool autoStop)
{
- autoStop=autoStop_;
- if (autoStop && subscriptions.empty()) return;
+ dispatcher.setAutoStop(autoStop);
dispatcher.run();
}