diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 29 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueueImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/PrivateImplRef.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/Subscription.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Subscription.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionImpl.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 129 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 44 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManagerImpl.cpp | 162 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManagerImpl.h | 278 | ||||
-rw-r--r-- | cpp/src/tests/BrokerFixture.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/echotest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/exception_test.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 1 | ||||
-rw-r--r-- | cpp/src/tests/txshift.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/txtest.cpp | 1 |
20 files changed, 552 insertions, 134 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index c01af9871a..4c1c4fff56 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -470,7 +470,10 @@ libqpidclient_la_SOURCES = \ qpid/client/StateManager.cpp \ qpid/client/Subscription.cpp \ qpid/client/SubscriptionImpl.cpp \ - qpid/client/SubscriptionManager.cpp + qpid/client/SubscriptionImpl.h \ + qpid/client/SubscriptionManager.cpp \ + qpid/client/SubscriptionManagerImpl.cpp \ + qpid/client/SubscriptionManagerImpl.h nobase_include_HEADERS = \ $(platform_hdr) \ @@ -595,7 +598,6 @@ nobase_include_HEADERS = \ qpid/client/Execution.h \ qpid/client/FailoverManager.h \ qpid/client/Subscription.h \ - qpid/client/SubscriptionImpl.h \ qpid/client/SubscriptionSettings.h \ qpid/client/FlowControl.h \ qpid/client/Future.h \ @@ -618,6 +620,29 @@ nobase_include_HEADERS = \ qpid/client/StateManager.h \ qpid/client/SubscriptionManager.h \ qpid/client/TypedResult.h \ + qpid/client/api/Connection.h \ + qpid/client/api/ConnectionImpl.h \ + qpid/client/api/Destination.h \ + qpid/client/api/Message.h \ + qpid/client/api/MessageImpl.h \ + qpid/client/api/MessageListener.h \ + qpid/client/api/MessageProducer.h \ + qpid/client/api/MessageProducerImpl.h \ + qpid/client/api/MessageReceiver.h \ + qpid/client/api/MessageReceiverImpl.h \ + qpid/client/api/MessageSink.h \ + qpid/client/api/MessageSource.h \ + qpid/client/api/Queue.h \ + qpid/client/api/Session.h \ + qpid/client/api/SessionImpl.h \ + qpid/client/api/Subscription.h \ + qpid/client/api/SubscriptionImpl.h \ + qpid/client/api/Topic.h \ + qpid/client/api/Variant.h \ + qpid/client/amqp0_10/MessageAdapter.h \ + qpid/client/amqp0_10/Sinks.h \ + qpid/client/amqp0_10/Sources.h \ + qpid/client/amqp0_10/SessionImpl.h \ qpid/framing/AMQBody.h \ qpid/framing/AMQCommandControlBody.h \ qpid/framing/AMQContentBody.h \ diff --git a/cpp/src/qpid/client/LocalQueueImpl.h b/cpp/src/qpid/client/LocalQueueImpl.h index 64da8675d8..1afaa82d11 100644 --- a/cpp/src/qpid/client/LocalQueueImpl.h +++ b/cpp/src/qpid/client/LocalQueueImpl.h @@ -100,7 +100,7 @@ class LocalQueueImpl : public RefCounted { private: Demux::QueuePtr queue; Subscription subscription; - friend class SubscriptionManager; + friend class SubscriptionManagerImpl; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/PrivateImplRef.h b/cpp/src/qpid/client/PrivateImplRef.h index ae29318eb9..503a383c31 100644 --- a/cpp/src/qpid/client/PrivateImplRef.h +++ b/cpp/src/qpid/client/PrivateImplRef.h @@ -76,14 +76,15 @@ template <class T> class PrivateImplRef { static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); } static void set(T& t, const intrusive_ptr& p) { - if(t.impl) boost::intrusive_ptr_release(t.impl); + if (t.impl == p) return; + if (t.impl) boost::intrusive_ptr_release(t.impl); t.impl = p.get(); if (t.impl) boost::intrusive_ptr_add_ref(t.impl); } // Helper functions to implement the ctor, dtor, copy, assign static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } - static void copy(T& t, const T& x) { t.impl = 0; assign(t, x); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } static T& assign(T& t, const T& x) { set(t, get(x)); return t;} }; diff --git a/cpp/src/qpid/client/Subscription.cpp b/cpp/src/qpid/client/Subscription.cpp index 285a8f242f..23e45a270a 100644 --- a/cpp/src/qpid/client/Subscription.cpp +++ b/cpp/src/qpid/client/Subscription.cpp @@ -46,7 +46,7 @@ void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messag void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); } void Subscription::release(const SequenceSet& messageIds) { impl->release(messageIds); } Session Subscription::getSession() const { return impl->getSession(); } -SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); } +SubscriptionManager Subscription::getSubscriptionManager() { return impl->getSubscriptionManager(); } void Subscription::cancel() { impl->cancel(); } void Subscription::grantMessageCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_MESSAGE, value); } void Subscription::grantByteCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_BYTE, value); } diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h index f39cdd50ac..83a9be5e14 100644 --- a/cpp/src/qpid/client/Subscription.h +++ b/cpp/src/qpid/client/Subscription.h @@ -103,7 +103,7 @@ class Subscription : public Handle<SubscriptionImpl> { QPID_CLIENT_EXTERN Session getSession() const; /** Get the subscription manager associated with this subscription */ - QPID_CLIENT_EXTERN SubscriptionManager& getSubscriptionManager() const; + QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager(); /** Cancel the subscription. */ QPID_CLIENT_EXTERN void cancel(); diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp index 69f79a1436..fb5808f3a6 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionImpl.cpp @@ -19,11 +19,14 @@ * */ +#include "AsyncSession.h" #include "SubscriptionImpl.h" +#include "SubscriptionManagerImpl.h" #include "MessageImpl.h" #include "CompletionImpl.h" #include "SubscriptionManager.h" #include "SubscriptionSettings.h" +#include "PrivateImplRef.h" namespace qpid { namespace client { @@ -31,8 +34,8 @@ namespace client { using sys::Mutex; using framing::MessageAcquireResult; -SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l) - : manager(m), name(n), queue(q), settings(s), listener(l) +SubscriptionImpl::SubscriptionImpl(SubscriptionManager m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l) + : manager(*PrivateImplRef<SubscriptionManager>::get(m)), name(n), queue(q), settings(s), listener(l) {} void SubscriptionImpl::subscribe() @@ -110,7 +113,7 @@ void SubscriptionImpl::release(const SequenceSet& messageIds) { Session SubscriptionImpl::getSession() const { return manager.getSession(); } -SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; } +SubscriptionManager SubscriptionImpl::getSubscriptionManager() { return SubscriptionManager(&manager); } void SubscriptionImpl::cancel() { manager.cancel(name); } diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h index e2b970ce05..da77213423 100644 --- a/cpp/src/qpid/client/SubscriptionImpl.h +++ b/cpp/src/qpid/client/SubscriptionImpl.h @@ -23,6 +23,7 @@ */ #include "qpid/client/SubscriptionSettings.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Session.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Demux.h" @@ -37,11 +38,12 @@ namespace qpid { namespace client { class SubscriptionManager; +class SubscriptionManagerImpl; class SubscriptionImpl : public RefCounted, public MessageListener { public: - QPID_CLIENT_EXTERN SubscriptionImpl(SubscriptionManager&, const std::string& queue, - const SubscriptionSettings&, const std::string& name, MessageListener* =0); + QPID_CLIENT_EXTERN SubscriptionImpl(SubscriptionManager, const std::string& queue, + const SubscriptionSettings&, const std::string& name, MessageListener* =0); /** The name of the subsctription, used as the "destination" for messages from the broker. * Usually the same as the queue name but can be set differently. @@ -84,7 +86,7 @@ class SubscriptionImpl : public RefCounted, public MessageListener { QPID_CLIENT_EXTERN Session getSession() const; /** Get the subscription manager associated with this subscription */ - QPID_CLIENT_EXTERN SubscriptionManager& getSubscriptionManager() const; + QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager(); /** Send subscription request and issue appropriate flow control commands. */ QPID_CLIENT_EXTERN void subscribe(); @@ -110,7 +112,7 @@ class SubscriptionImpl : public RefCounted, public MessageListener { private: mutable sys::Mutex lock; - SubscriptionManager& manager; + SubscriptionManagerImpl& manager; std::string name, queue; SubscriptionSettings settings; framing::SequenceSet unacquired, unaccepted; diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 999b9c6ba7..06aa0dcdf8 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -18,144 +18,85 @@ * under the License. * */ -#ifndef _Subscription_ -#define _Subscription_ #include "SubscriptionManager.h" -#include "SubscriptionImpl.h" -#include "LocalQueueImpl.h" +#include "SubscriptionManagerImpl.h" #include "PrivateImplRef.h" -#include <qpid/client/Dispatcher.h> -#include <qpid/client/Session.h> -#include <qpid/client/MessageListener.h> -#include <qpid/framing/Uuid.h> -#include <set> -#include <sstream> namespace qpid { namespace client { -SubscriptionManager::SubscriptionManager(const Session& s) - : dispatcher(s), session(s), autoStop(true) -{} +typedef PrivateImplRef<SubscriptionManager> PI; + +SubscriptionManager::SubscriptionManager(const Session& s) { PI::ctor(*this, new SubscriptionManagerImpl(s)); } +SubscriptionManager::SubscriptionManager(SubscriptionManagerImpl* i) { PI::ctor(*this, i); } +SubscriptionManager::SubscriptionManager(const SubscriptionManager& x) : Handle<SubscriptionManagerImpl>() { PI::copy(*this, x); } +SubscriptionManager::~SubscriptionManager() { PI::dtor(*this); } +SubscriptionManager& SubscriptionManager::operator=(const SubscriptionManager& x) { return PI::assign(*this, x); } Subscription SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) -{ - sys::Mutex::ScopedLock l(lock); - std::string name=n.empty() ? q:n; - boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener); - dispatcher.listen(si); - //issue subscription request after listener is registered with dispatcher - si->subscribe(); - return subscriptions[name] = Subscription(si.get()); -} +{ return impl->subscribe(listener, q, ss, n); } Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) -{ - sys::Mutex::ScopedLock l(lock); - std::string name=n.empty() ? q:n; - boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); - boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq); - lqi->queue=si->divert(); - si->subscribe(); - lqi->subscription = Subscription(si.get()); - return subscriptions[name] = lqi->subscription; -} +{ return impl->subscribe(lq, q, ss, n); } + Subscription SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& n) -{ - return subscribe(listener, q, defaultSettings, n); -} +{ return impl->subscribe(listener, q, n); } + Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& n) -{ - return subscribe(lq, q, defaultSettings, n); -} +{ return impl->subscribe(lq, q, n); } -void SubscriptionManager::cancel(const std::string& dest) -{ - sys::Mutex::ScopedLock l(lock); - std::map<std::string, Subscription>::iterator i = subscriptions.find(dest); - if (i != subscriptions.end()) { - sync(session).messageCancel(dest); - dispatcher.cancel(dest); - Subscription s = i->second; - if (s.isValid()) s.impl->cancelDiversion(); - subscriptions.erase(i); - } -} +void SubscriptionManager::cancel(const std::string& dest) { return impl->cancel(dest); } -void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } +void SubscriptionManager::setAutoStop(bool set) { impl->setAutoStop(set); } -void SubscriptionManager::run() -{ - dispatcher.setAutoStop(autoStop); - dispatcher.run(); -} +void SubscriptionManager::run() { impl->run(); } -void SubscriptionManager::start() -{ - dispatcher.setAutoStop(autoStop); - dispatcher.start(); -} +void SubscriptionManager::start() { impl->start(); } -void SubscriptionManager::wait() -{ - dispatcher.wait(); -} +void SubscriptionManager::wait() { impl->wait(); } -void SubscriptionManager::stop() -{ - dispatcher.stop(); -} +void SubscriptionManager::stop() { impl->stop(); } bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { - LocalQueue lq; - std::string unique = framing::Uuid(true).str(); - subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique); - AutoCancel ac(*this, unique); - //first wait for message to be delivered if a timeout has been specified - if (timeout && lq.get(result, timeout)) - return true; - //make sure message is not on queue before final check - sync(session).messageFlush(unique); - return lq.get(result, 0); + return impl->get(result, queue, timeout); } Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout) { - Message result; - if (!get(result, queue, timeout)) - throw Exception("Timed out waiting for a message"); - return result; + return impl->get(queue, timeout); } -Session SubscriptionManager::getSession() const { return session; } +Session SubscriptionManager::getSession() const { return impl->getSession(); } Subscription SubscriptionManager::getSubscription(const std::string& name) const { - sys::Mutex::ScopedLock l(lock); - std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name); - if (i == subscriptions.end()) - throw Exception(QPID_MSG("Subscription not found: " << name)); - return i->second; + return impl->getSubscription(name); } - void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) { - dispatcher.registerFailoverHandler(fh); + impl->registerFailoverHandler(fh); } void SubscriptionManager::setFlowControl(const std::string& name, const FlowControl& flow) { - getSubscription(name).setFlowControl(flow); + impl->setFlowControl(name, flow); } void SubscriptionManager::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) { - setFlowControl(name, FlowControl(messages, bytes, window)); + impl->setFlowControl(name, FlowControl(messages, bytes, window)); } +void SubscriptionManager::setFlowControl(uint32_t messages, uint32_t bytes, bool window) { + impl->setFlowControl(messages, bytes, window); +} + +void SubscriptionManager::setAcceptMode(AcceptMode mode) { impl->setAcceptMode(mode); } +void SubscriptionManager::setAcquireMode(AcquireMode mode) { impl->setAcquireMode(mode); } + }} // namespace qpid::client -#endif + diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 91ad2b6d56..eaff0975c5 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -21,22 +21,21 @@ * under the License. * */ -#include "qpid/sys/Mutex.h" -#include <qpid/client/Dispatcher.h> -#include <qpid/client/Completion.h> + #include <qpid/client/Session.h> -#include <qpid/client/AsyncSession.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/LocalQueue.h> #include <qpid/client/Subscription.h> #include <qpid/sys/Runnable.h> #include <qpid/client/ClientImportExport.h> -#include <set> -#include <sstream> +#include "qpid/client/MessageListener.h" +#include "qpid/client/LocalQueue.h" +#include <qpid/client/Handle.h> +#include <string> namespace qpid { namespace client { +class SubscriptionManagerImpl; + /** * A class to help create and manage subscriptions. * @@ -94,11 +93,14 @@ namespace client { * </ul> * */ -class SubscriptionManager : public sys::Runnable +class SubscriptionManager : public sys::Runnable, public Handle<SubscriptionManagerImpl> { public: /** Create a new SubscriptionManager associated with a session */ QPID_CLIENT_EXTERN SubscriptionManager(const Session& session); + QPID_CLIENT_EXTERN SubscriptionManager(const SubscriptionManager&); + QPID_CLIENT_EXTERN ~SubscriptionManager(); + QPID_CLIENT_EXTERN SubscriptionManager& operator=(const SubscriptionManager&); /** * Subscribe a MessagesListener to receive messages from queue. @@ -224,17 +226,17 @@ class SubscriptionManager : public sys::Runnable /** Set the default settings for subscribe() calls that don't * include a SubscriptionSettings parameter. */ - QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; } + QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s); /** Get the default settings for subscribe() calls that don't * include a SubscriptionSettings parameter. */ - QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; } + QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const; /** Get the default settings for subscribe() calls that don't * include a SubscriptionSettings parameter. */ - QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings() { return defaultSettings; } + QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings(); /** * Set the default flow control settings for subscribe() calls @@ -244,32 +246,28 @@ class SubscriptionManager : public sys::Runnable *@param bytes: byte credit. *@param window: if true use window-based flow control. */ - QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) { - defaultSettings.flowControl = FlowControl(messages, bytes, window); - } + QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); /** *Set the default accept-mode for subscribe() calls that don't *include a SubscriptionSettings parameter. */ - QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; } + QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode); /** * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings. */ - QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; } + QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode); QPID_CLIENT_EXTERN void registerFailoverHandler ( boost::function<void ()> fh ); QPID_CLIENT_EXTERN Session getSession() const; + SubscriptionManager(SubscriptionManagerImpl*); ///<@internal + private: - mutable sys::Mutex lock; - qpid::client::Dispatcher dispatcher; - qpid::client::AsyncSession session; - bool autoStop; - SubscriptionSettings defaultSettings; - std::map<std::string, Subscription> subscriptions; + typedef SubscriptionManagerImpl Impl; + friend class PrivateImplRef<SubscriptionManager>; }; /** AutoCancel cancels a subscription in its destructor */ diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp new file mode 100644 index 0000000000..27b46c36f8 --- /dev/null +++ b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SubscriptionManager.h" +#include "SubscriptionManagerImpl.h" +#include "SubscriptionImpl.h" +#include "LocalQueueImpl.h" +#include "PrivateImplRef.h" +#include <qpid/client/Dispatcher.h> +#include <qpid/client/Session.h> +#include <qpid/client/MessageListener.h> +#include <qpid/framing/Uuid.h> +#include <set> +#include <sstream> + + +namespace qpid { +namespace client { + +SubscriptionManagerImpl::SubscriptionManagerImpl(const Session& s) + : dispatcher(s), session(s), autoStop(true) +{} + +Subscription SubscriptionManagerImpl::subscribe( + MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) +{ + sys::Mutex::ScopedLock l(lock); + std::string name=n.empty() ? q:n; + boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(SubscriptionManager(this), q, ss, name, &listener); + dispatcher.listen(si); + //issue subscription request after listener is registered with dispatcher + si->subscribe(); + return subscriptions[name] = Subscription(si.get()); +} + +Subscription SubscriptionManagerImpl::subscribe( + LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) +{ + sys::Mutex::ScopedLock l(lock); + std::string name=n.empty() ? q:n; + boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(SubscriptionManager(this), q, ss, name, 0); + boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq); + lqi->queue=si->divert(); + si->subscribe(); + lqi->subscription = Subscription(si.get()); + return subscriptions[name] = lqi->subscription; +} + +Subscription SubscriptionManagerImpl::subscribe( + MessageListener& listener, const std::string& q, const std::string& n) +{ + return subscribe(listener, q, defaultSettings, n); +} + +Subscription SubscriptionManagerImpl::subscribe( + LocalQueue& lq, const std::string& q, const std::string& n) +{ + return subscribe(lq, q, defaultSettings, n); +} + +void SubscriptionManagerImpl::cancel(const std::string& dest) +{ + sys::Mutex::ScopedLock l(lock); + std::map<std::string, Subscription>::iterator i = subscriptions.find(dest); + if (i != subscriptions.end()) { + sync(session).messageCancel(dest); + dispatcher.cancel(dest); + Subscription s = i->second; + if (s.isValid()) + PrivateImplRef<Subscription>::get(s)->cancelDiversion(); + subscriptions.erase(i); + } +} + +void SubscriptionManagerImpl::setAutoStop(bool set) { autoStop=set; } + +void SubscriptionManagerImpl::run() +{ + dispatcher.setAutoStop(autoStop); + dispatcher.run(); +} + +void SubscriptionManagerImpl::start() +{ + dispatcher.setAutoStop(autoStop); + dispatcher.start(); +} + +void SubscriptionManagerImpl::wait() +{ + dispatcher.wait(); +} + +void SubscriptionManagerImpl::stop() +{ + dispatcher.stop(); +} + +bool SubscriptionManagerImpl::get(Message& result, const std::string& queue, sys::Duration timeout) { + LocalQueue lq; + std::string unique = framing::Uuid(true).str(); + subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique); + SubscriptionManager sm(this); + AutoCancel ac(sm, unique); + //first wait for message to be delivered if a timeout has been specified + if (timeout && lq.get(result, timeout)) + return true; + //make sure message is not on queue before final check + sync(session).messageFlush(unique); + return lq.get(result, 0); +} + +Message SubscriptionManagerImpl::get(const std::string& queue, sys::Duration timeout) { + Message result; + if (!get(result, queue, timeout)) + throw Exception("Timed out waiting for a message"); + return result; +} + +Session SubscriptionManagerImpl::getSession() const { return session; } + +Subscription SubscriptionManagerImpl::getSubscription(const std::string& name) const { + sys::Mutex::ScopedLock l(lock); + std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name); + if (i == subscriptions.end()) + throw Exception(QPID_MSG("Subscription not found: " << name)); + return i->second; +} + +void SubscriptionManagerImpl::registerFailoverHandler (boost::function<void ()> fh) { + dispatcher.registerFailoverHandler(fh); +} + +void SubscriptionManagerImpl::setFlowControl(const std::string& name, const FlowControl& flow) { + getSubscription(name).setFlowControl(flow); +} + +void SubscriptionManagerImpl::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) { + setFlowControl(name, FlowControl(messages, bytes, window)); +} + +}} // namespace qpid::client + + diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.h b/cpp/src/qpid/client/SubscriptionManagerImpl.h new file mode 100644 index 0000000000..6376a05c45 --- /dev/null +++ b/cpp/src/qpid/client/SubscriptionManagerImpl.h @@ -0,0 +1,278 @@ +#ifndef QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H +#define QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Mutex.h" +#include <qpid/client/Dispatcher.h> +#include <qpid/client/Completion.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/LocalQueue.h> +#include <qpid/client/Subscription.h> +#include <qpid/sys/Runnable.h> +#include <qpid/RefCounted.h> +#include <set> +#include <sstream> + +namespace qpid { +namespace client { + +/** + * A class to help create and manage subscriptions. + * + * Set up your subscriptions, then call run() to have messages + * delivered. + * + * \ingroup clientapi + * + * \details + * + * <h2>Subscribing and canceling subscriptions</h2> + * + * <ul> + * <li> + * <p>subscribe()</p> + * <pre> SubscriptionManager subscriptions(session); + * Listener listener(subscriptions); + * subscriptions.subscribe(listener, myQueue);</pre> + * <pre> SubscriptionManager subscriptions(session); + * LocalQueue local_queue; + * subscriptions.subscribe(local_queue, string("message_queue"));</pre></li> + * <li> + * <p>cancel()</p> + * <pre>subscriptions.cancel();</pre></li> + * </ul> + * + * <h2>Waiting for messages (and returning)</h2> + * + * <ul> + * <li> + * <p>run()</p> + * <pre> // Give up control to receive messages + * subscriptions.run();</pre></li> + * <li> + * <p>stop()</p> + * <pre>.// Use this code in a listener to return from run() + * subscriptions.stop();</pre></li> + * <li> + * <p>setAutoStop()</p> + * <pre>.// Return from subscriptions.run() when last subscription is cancelled + *.subscriptions.setAutoStop(true); + *.subscriptons.run(); + * </pre></li> + * <li> + * <p>Ending a subscription in a listener</p> + * <pre> + * void Listener::received(Message& message) { + * + * if (message.getData() == "That's all, folks!") { + * subscriptions.cancel(message.getDestination()); + * } + * } + * </pre> + * </li> + * </ul> + * + */ +class SubscriptionManagerImpl : public sys::Runnable, public RefCounted +{ + public: + /** Create a new SubscriptionManagerImpl associated with a session */ + SubscriptionManagerImpl(const Session& session); + + /** + * Subscribe a MessagesListener to receive messages from queue. + * + * Provide your own subclass of MessagesListener to process + * incoming messages. It will be called for each message received. + * + *@param listener Listener object to receive messages. + *@param queue Name of the queue to subscribe to. + *@param settings settings for the subscription. + *@param name unique destination name for the subscription, defaults to queue name. + */ + Subscription subscribe(MessageListener& listener, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + * Incoming messages are stored in the queue for you to retrieve. + * + *@param queue Name of the queue to subscribe to. + *@param flow initial FlowControl for the subscription. + *@param name unique destination name for the subscription, defaults to queue name. + * If not specified, the queue name is used. + */ + Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); + + /** + * Subscribe a MessagesListener to receive messages from queue. + * + * Provide your own subclass of MessagesListener to process + * incoming messages. It will be called for each message received. + * + *@param listener Listener object to receive messages. + *@param queue Name of the queue to subscribe to. + *@param name unique destination name for the subscription, defaults to queue name. + * If not specified, the queue name is used. + */ + Subscription subscribe(MessageListener& listener, + const std::string& queue, + const std::string& name=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + * Incoming messages are stored in the queue for you to retrieve. + * + *@param queue Name of the queue to subscribe to. + *@param name unique destination name for the subscription, defaults to queue name. + * If not specified, the queue name is used. + */ + Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& name=std::string()); + + + /** Get a single message from a queue. + *@param result is set to the message from the queue. + *@param timeout wait up this timeout for a message to appear. + *@return true if result was set, false if no message available after timeout. + */ + bool get(Message& result, const std::string& queue, sys::Duration timeout=0); + + /** Get a single message from a queue. + *@param timeout wait up this timeout for a message to appear. + *@return message from the queue. + *@throw Exception if the timeout is exceeded. + */ + Message get(const std::string& queue, sys::Duration timeout=sys::TIME_INFINITE); + + /** Get a subscription by name. + *@throw Exception if not found. + */ + Subscription getSubscription(const std::string& name) const; + + /** Cancel a subscription. See also: Subscription.cancel() */ + void cancel(const std::string& name); + + /** Deliver messages in the current thread until stop() is called. + * Only one thread may be running in a SubscriptionManager at a time. + * @see run + */ + void run(); + + /** Start a new thread to deliver messages. + * Only one thread may be running in a SubscriptionManager at a time. + * @see start + */ + void start(); + + /** + * Wait for the thread started by a call to start() to complete. + */ + void wait(); + + /** If set true, run() will stop when all subscriptions + * are cancelled. If false, run will only stop when stop() + * is called. True by default. + */ + void setAutoStop(bool set=true); + + /** Stop delivery. Causes run() to return, or the thread started with start() to exit. */ + void stop(); + + static const uint32_t UNLIMITED=0xFFFFFFFF; + + /** Set the flow control for a subscription. */ + void setFlowControl(const std::string& name, const FlowControl& flow); + + /** Set the flow control for a subscription. + *@param name: name of the subscription. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true); + + /** Set the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; } + + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; } + + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + SubscriptionSettings& getDefaultSettings() { return defaultSettings; } + + /** + * Set the default flow control settings for subscribe() calls + * that don't include a SubscriptionSettings parameter. + * + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) { + defaultSettings.flowControl = FlowControl(messages, bytes, window); + } + + /** + *Set the default accept-mode for subscribe() calls that don't + *include a SubscriptionSettings parameter. + */ + void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; } + + /** + * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings. + */ + void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; } + + void registerFailoverHandler ( boost::function<void ()> fh ); + + Session getSession() const; + + private: + mutable sys::Mutex lock; + qpid::client::Dispatcher dispatcher; + qpid::client::AsyncSession session; + bool autoStop; + SubscriptionSettings defaultSettings; + std::map<std::string, Subscription> subscriptions; +}; + + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H*/ diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h index b32b7f44ba..c691e351e7 100644 --- a/cpp/src/tests/BrokerFixture.h +++ b/cpp/src/tests/BrokerFixture.h @@ -29,6 +29,7 @@ #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" +#include "qpid/client/LocalQueue.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/sys/Thread.h" diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 1c719d16dc..0a72facd86 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -22,7 +22,9 @@ #include "test_tools.h" #include "BrokerFixture.h" #include "qpid/client/QueueOptions.h" +#include "qpid/client/MessageListener.h" #include "qpid/client/SubscriptionManager.h" +#include "qpid/client/AsyncSession.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" diff --git a/cpp/src/tests/echotest.cpp b/cpp/src/tests/echotest.cpp index 7cbf3e7df4..98590e35ff 100644 --- a/cpp/src/tests/echotest.cpp +++ b/cpp/src/tests/echotest.cpp @@ -21,7 +21,7 @@ #include <qpid/client/Connection.h> #include <qpid/client/SubscriptionManager.h> -#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> #include <qpid/sys/Time.h> diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index e420bf2f0b..379e957ef1 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -23,6 +23,7 @@ #include "test_tools.h" #include "BrokerFixture.h" #include "qpid/client/SubscriptionManager.h" +#include "qpid/client/MessageListener.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/framing/reply_exceptions.h" diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 81ac610001..6ad84e1b82 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -28,6 +28,7 @@ #include <vector> #include "TestOptions.h" +#include "qpid/sys/Thread.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/AsyncSession.h" diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 18491d72a0..78606e46cd 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -28,6 +28,7 @@ #include "qpid/client/Message.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" #include <boost/lexical_cast.hpp> #include <boost/bind.hpp> diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 1f0b3f5377..44070cd4c9 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -36,6 +36,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/Time.h" diff --git a/cpp/src/tests/txshift.cpp b/cpp/src/tests/txshift.cpp index dd67710526..97135c9829 100644 --- a/cpp/src/tests/txshift.cpp +++ b/cpp/src/tests/txshift.cpp @@ -61,7 +61,7 @@ struct Transfer : MessageListener Transfer(const std::string control_) : control(control_), expected(0), transfered(0) {} - void subscribeToSource(SubscriptionManager& manager) + void subscribeToSource(SubscriptionManager manager) { sourceSettings.autoAck = 0;//will accept once at the end of the batch sourceSettings.flowControl = FlowControl::messageCredit(expected); @@ -69,7 +69,7 @@ struct Transfer : MessageListener QPID_LOG(info, "Subscribed to source: " << source << " expecting: " << expected); } - void subscribeToControl(SubscriptionManager& manager) + void subscribeToControl(SubscriptionManager manager) { controlSettings.flowControl = FlowControl::messageCredit(1); controlSubscription = manager.subscribe(*this, control, controlSettings); diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp index fb1d19ca8a..c1ee246e2c 100644 --- a/cpp/src/tests/txtest.cpp +++ b/cpp/src/tests/txtest.cpp @@ -34,6 +34,7 @@ #include "qpid/framing/Array.h" #include "qpid/framing/Buffer.h" #include "qpid/sys/uuid.h" +#include "qpid/sys/Thread.h" using namespace qpid; using namespace qpid::client; |