diff options
Diffstat (limited to 'cpp/include/qpid/client/SubscriptionManager.h')
-rw-r--r-- | cpp/include/qpid/client/SubscriptionManager.h | 292 |
1 files changed, 292 insertions, 0 deletions
diff --git a/cpp/include/qpid/client/SubscriptionManager.h b/cpp/include/qpid/client/SubscriptionManager.h new file mode 100644 index 0000000000..e70e05f73a --- /dev/null +++ b/cpp/include/qpid/client/SubscriptionManager.h @@ -0,0 +1,292 @@ +#ifndef QPID_CLIENT_SUBSCRIPTIONMANAGER_H +#define QPID_CLIENT_SUBSCRIPTIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/client/Session.h" +#include "qpid/client/Subscription.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "qpid/client/ClientImportExport.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/LocalQueue.h" +#include "qpid/client/Handle.h" +#include <string> + +namespace qpid { +namespace client { + +class SubscriptionManagerImpl; + +/** + * A class to help create and manage subscriptions. + * + * Set up your subscriptions, then call run() to have messages + * delivered. + * + * \ingroup clientapi + * + * \details + * + * <h2>Subscribing and canceling subscriptions</h2> + * + * <ul> + * <li> + * <p>subscribe()</p> + * <pre> SubscriptionManager subscriptions(session); + * Listener listener(subscriptions); + * subscriptions.subscribe(listener, myQueue);</pre> + * <pre> SubscriptionManager subscriptions(session); + * LocalQueue local_queue; + * subscriptions.subscribe(local_queue, string("message_queue"));</pre></li> + * <li> + * <p>cancel()</p> + * <pre>subscriptions.cancel();</pre></li> + * </ul> + * + * <h2>Waiting for messages (and returning)</h2> + * + * <ul> + * <li> + * <p>run()</p> + * <pre> // Give up control to receive messages + * subscriptions.run();</pre></li> + * <li> + * <p>stop()</p> + * <pre>.// Use this code in a listener to return from run() + * subscriptions.stop();</pre></li> + * <li> + * <p>setAutoStop()</p> + * <pre>.// Return from subscriptions.run() when last subscription is cancelled + *.subscriptions.setAutoStop(true); + *.subscriptons.run(); + * </pre></li> + * <li> + * <p>Ending a subscription in a listener</p> + * <pre> + * void Listener::received(Message& message) { + * + * if (message.getData() == "That's all, folks!") { + * subscriptions.cancel(message.getDestination()); + * } + * } + * </pre> + * </li> + * </ul> + * + */ +class SubscriptionManager : public sys::Runnable, public Handle<SubscriptionManagerImpl> +{ + public: + /** Create a new SubscriptionManager associated with a session */ + QPID_CLIENT_EXTERN SubscriptionManager(const Session& session); + QPID_CLIENT_EXTERN SubscriptionManager(const SubscriptionManager&); + QPID_CLIENT_EXTERN ~SubscriptionManager(); + QPID_CLIENT_EXTERN SubscriptionManager& operator=(const SubscriptionManager&); + + /** + * Subscribe a MessagesListener to receive messages from queue. + * + * Provide your own subclass of MessagesListener to process + * incoming messages. It will be called for each message received. + * + *@param listener Listener object to receive messages. + *@param queue Name of the queue to subscribe to. + *@param settings settings for the subscription. + *@param name unique destination name for the subscription, defaults to queue name. + */ + QPID_CLIENT_EXTERN Subscription subscribe(MessageListener& listener, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + * Incoming messages are stored in the queue for you to retrieve. + * + *@param queue Name of the queue to subscribe to. + *@param flow initial FlowControl for the subscription. + *@param name unique destination name for the subscription, defaults to queue name. + * If not specified, the queue name is used. + */ + QPID_CLIENT_EXTERN Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); + + /** + * Subscribe a MessagesListener to receive messages from queue. + * + * Provide your own subclass of MessagesListener to process + * incoming messages. It will be called for each message received. + * + *@param listener Listener object to receive messages. + *@param queue Name of the queue to subscribe to. + *@param name unique destination name for the subscription, defaults to queue name. + * If not specified, the queue name is used. + */ + QPID_CLIENT_EXTERN Subscription subscribe(MessageListener& listener, + const std::string& queue, + const std::string& name=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + * Incoming messages are stored in the queue for you to retrieve. + * + *@param queue Name of the queue to subscribe to. + *@param name unique destination name for the subscription, defaults to queue name. + * If not specified, the queue name is used. + */ + QPID_CLIENT_EXTERN Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& name=std::string()); + + + /** Get a single message from a queue. + * (Note: this currently uses a subscription per invocation and is + * thus relatively expensive. The subscription is cancelled as + * part of each call which can trigger auto-deletion). + *@param result is set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if no message available after timeout. + */ + QPID_CLIENT_EXTERN bool get(Message& result, const std::string& queue, sys::Duration timeout=0); + + /** Get a single message from a queue. + * (Note: this currently uses a subscription per invocation and is + * thus relatively expensive. The subscription is cancelled as + * part of each call which can trigger auto-deletion). + *@param timeout wait up this timeout for a message to appear. + *@return message from the queue. + *@throw Exception if the timeout is exceeded. + */ + QPID_CLIENT_EXTERN Message get(const std::string& queue, sys::Duration timeout=sys::TIME_INFINITE); + + /** Get a subscription by name. + *@throw Exception if not found. + */ + QPID_CLIENT_EXTERN Subscription getSubscription(const std::string& name) const; + + /** Cancel a subscription. See also: Subscription.cancel() */ + QPID_CLIENT_EXTERN void cancel(const std::string& name); + + /** Deliver messages in the current thread until stop() is called. + * Only one thread may be running in a SubscriptionManager at a time. + * @see run + */ + QPID_CLIENT_EXTERN void run(); + + /** Start a new thread to deliver messages. + * Only one thread may be running in a SubscriptionManager at a time. + * @see start + */ + QPID_CLIENT_EXTERN void start(); + + /** + * Wait for the thread started by a call to start() to complete. + */ + QPID_CLIENT_EXTERN void wait(); + + /** If set true, run() will stop when all subscriptions + * are cancelled. If false, run will only stop when stop() + * is called. True by default. + */ + QPID_CLIENT_EXTERN void setAutoStop(bool set=true); + + /** Stop delivery. Causes run() to return, or the thread started with start() to exit. */ + QPID_CLIENT_EXTERN void stop(); + + static const uint32_t UNLIMITED=0xFFFFFFFF; + + /** Set the flow control for a subscription. */ + QPID_CLIENT_EXTERN void setFlowControl(const std::string& name, const FlowControl& flow); + + /** Set the flow control for a subscription. + *@param name: name of the subscription. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + QPID_CLIENT_EXTERN void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true); + + /** Set the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s); + + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const; + + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings(); + + /** + * Set the default flow control settings for subscribe() calls + * that don't include a SubscriptionSettings parameter. + * + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); + + /** + *Set the default accept-mode for subscribe() calls that don't + *include a SubscriptionSettings parameter. + */ + QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode); + + /** + * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings. + */ + QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode); + + QPID_CLIENT_EXTERN void registerFailoverHandler ( boost::function<void ()> fh ); + + QPID_CLIENT_EXTERN Session getSession() const; + + SubscriptionManager(SubscriptionManagerImpl*); ///<@internal + + private: + typedef SubscriptionManagerImpl Impl; + friend class PrivateImplRef<SubscriptionManager>; +}; + +/** AutoCancel cancels a subscription in its destructor */ +class AutoCancel { + public: + AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {} + ~AutoCancel() { sm.cancel(tag); } + private: + SubscriptionManager& sm; + std::string tag; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SUBSCRIPTIONMANAGER_H*/ |