diff options
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.h')
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 61 |
1 files changed, 36 insertions, 25 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 985b6ce222..1a03c1a47b 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -21,55 +21,59 @@ * under the License. * */ - +#include "qpid/sys/Mutex.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Session_0_10.h> #include <qpid/client/MessageListener.h> +#include <qpid/client/LocalQueue.h> #include <set> #include <sstream> namespace qpid { namespace client { -struct TagNotUniqueException : public qpid::Exception { - TagNotUniqueException() {} -}; - class SubscriptionManager { - std::set<std::string> subscriptions; + typedef sys::Mutex::ScopedLock Lock; + typedef sys::Mutex::ScopedUnlock Unlock; + qpid::client::Dispatcher dispatcher; qpid::client::Session_0_10& session; - std::string uniqueTag(const std::string&); uint32_t messages; uint32_t bytes; - bool autoStop; + bool window; public: SubscriptionManager(Session_0_10& session); /** - * Subscribe listener to receive messages from queue. + * Subscribe a MessagesListener to receive messages from queue. + * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. *@param tag Unique destination tag for the listener. - * If not specified a unique tag will be generted based on the queue name. - *@return Destination tag. - *@exception TagNotUniqueException if there is already a subscription - * with the same tag. + * If not specified, the queue name is used. */ - std::string subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + void subscribe(MessageListener& listener, + const std::string& queue, + const std::string& tag=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + *@param queue Name of the queue to subscribe to. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& tag=std::string()); /** Cancel a subscription. */ void cancel(const std::string tag); - - qpid::client::Dispatcher& getDispatcher() { return dispatcher; } - size_t size() { return subscriptions.size(); } /** Deliver messages until stop() is called. - *@param autoStop If true, return when all subscriptions are cancelled. + *@param autoStop If true, return when all listeners are cancelled. */ void run(bool autoStop=true); @@ -78,13 +82,20 @@ public: static const uint32_t UNLIMITED=0xFFFFFFFF; - /** Set the flow control limits for subscriber with tag. - * UNLIMITED means no limit. + /** Set the flow control for destination tag. + *@param tag: name of the destination. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. */ - void flowLimits(const std::string& tag, uint32_t messages, uint32_t bytes); + void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true); - /** Set the initial flow control limits for new subscribers */ - void flowLimits(uint32_t messages, uint32_t bytes); + /** Set the initial flow control settings to be applied to each new subscribtion. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); }; |