summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp146
1 files changed, 51 insertions, 95 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index b4c48f7365..7eac3c541b 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -18,129 +18,85 @@
* under the License.
*
*/
-#ifndef _Subscription_
-#define _Subscription_
-#include "SubscriptionManager.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/framing/Uuid.h>
-#include <set>
-#include <sstream>
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/SubscriptionManagerImpl.h"
+#include "qpid/client/PrivateImplRef.h"
namespace qpid {
namespace client {
-SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s),
- flowControl(UNLIMITED, UNLIMITED, false),
- acceptMode(0), acquireMode(0),
- autoStop(true)
-{}
-
-void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest, const FlowControl& fc)
-{
- session.messageSubscribe(
- arg::queue=q, arg::destination=dest,
- arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- if (fc.messages || fc.bytes) // No need to set if all 0.
- setFlowControl(dest, fc);
-}
+typedef PrivateImplRef<SubscriptionManager> PI;
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& d)
-{
- subscribe(listener, q, getFlowControl(), d);
-}
+SubscriptionManager::SubscriptionManager(const Session& s) { PI::ctor(*this, new SubscriptionManagerImpl(s)); }
+SubscriptionManager::SubscriptionManager(SubscriptionManagerImpl* i) { PI::ctor(*this, i); }
+SubscriptionManager::SubscriptionManager(const SubscriptionManager& x) : Runnable(), Handle<SubscriptionManagerImpl>() { PI::copy(*this, x); }
+SubscriptionManager::~SubscriptionManager() { PI::dtor(*this); }
+SubscriptionManager& SubscriptionManager::operator=(const SubscriptionManager& x) { return PI::assign(*this, x); }
-void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
-{
- std::string dest=d.empty() ? q:d;
- dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest, fc);
-}
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
+{ return impl->subscribe(listener, q, ss, n); }
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& d)
-{
- subscribe(lq, q, getFlowControl(), d);
-}
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
+{ return impl->subscribe(lq, q, ss, n); }
-void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
-{
- std::string dest=d.empty() ? q:d;
- lq.session=session;
- lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest, fc);
-}
-void SubscriptionManager::setFlowControl(
- const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
-{
- session.messageSetFlowMode(dest, window);
- session.messageFlow(dest, 0, messages);
- session.messageFlow(dest, 1, bytes);
- session.sync();
-}
+Subscription SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const std::string& n)
+{ return impl->subscribe(listener, q, n); }
-void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
- setFlowControl(dest, fc.messages, fc.bytes, fc.window);
-}
-void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
+Subscription SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& n)
+{ return impl->subscribe(lq, q, n); }
-void SubscriptionManager::setFlowControl(
- uint32_t messages_, uint32_t bytes_, bool window_)
-{
- setFlowControl(FlowControl(messages_, bytes_, window_));
-}
+void SubscriptionManager::cancel(const std::string& dest) { return impl->cancel(dest); }
-const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
+void SubscriptionManager::setAutoStop(bool set) { impl->setAutoStop(set); }
-void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
+void SubscriptionManager::run() { impl->run(); }
-void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
+void SubscriptionManager::start() { impl->start(); }
-void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+void SubscriptionManager::wait() { impl->wait(); }
-AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; }
+void SubscriptionManager::stop() { impl->stop(); }
-void SubscriptionManager::cancel(const std::string dest)
-{
- sync(session).messageCancel(dest);
- dispatcher.cancel(dest);
+bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
+ return impl->get(result, queue, timeout);
}
-void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout) {
+ return impl->get(queue, timeout);
+}
+
+Session SubscriptionManager::getSession() const { return impl->getSession(); }
-void SubscriptionManager::run()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.run();
+Subscription SubscriptionManager::getSubscription(const std::string& name) const {
+ return impl->getSubscription(name);
+}
+void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
+ impl->registerFailoverHandler(fh);
}
-void SubscriptionManager::stop()
-{
- dispatcher.stop();
+void SubscriptionManager::setFlowControl(const std::string& name, const FlowControl& flow) {
+ impl->setFlowControl(name, flow);
}
-bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
- LocalQueue lq;
- std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, FlowControl::messageCredit(1), unique);
- AutoCancel ac(*this, unique);
- //first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout)) return true;
- //make sure message is not on queue before final check
- sync(session).messageFlush(unique);
- return lq.get(result, 0);
+void SubscriptionManager::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) {
+ impl->setFlowControl(name, FlowControl(messages, bytes, window));
}
+void SubscriptionManager::setFlowControl(uint32_t messages, uint32_t bytes, bool window) {
+ impl->setFlowControl(messages, bytes, window);
+}
+
+void SubscriptionManager::setAcceptMode(AcceptMode mode) { impl->setAcceptMode(mode); }
+void SubscriptionManager::setAcquireMode(AcquireMode mode) { impl->setAcquireMode(mode); }
+
}} // namespace qpid::client
-#endif
+