summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am29
-rw-r--r--cpp/src/qpid/client/LocalQueueImpl.h2
-rw-r--r--cpp/src/qpid/client/PrivateImplRef.h5
-rw-r--r--cpp/src/qpid/client/Subscription.cpp2
-rw-r--r--cpp/src/qpid/client/Subscription.h2
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.cpp9
-rw-r--r--cpp/src/qpid/client/SubscriptionImpl.h10
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp129
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h44
-rw-r--r--cpp/src/qpid/client/SubscriptionManagerImpl.cpp162
-rw-r--r--cpp/src/qpid/client/SubscriptionManagerImpl.h278
-rw-r--r--cpp/src/tests/BrokerFixture.h1
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp2
-rw-r--r--cpp/src/tests/echotest.cpp2
-rw-r--r--cpp/src/tests/exception_test.cpp1
-rw-r--r--cpp/src/tests/latencytest.cpp1
-rw-r--r--cpp/src/tests/perftest.cpp1
-rw-r--r--cpp/src/tests/topic_listener.cpp1
-rw-r--r--cpp/src/tests/txshift.cpp4
-rw-r--r--cpp/src/tests/txtest.cpp1
20 files changed, 552 insertions, 134 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index c01af9871a..4c1c4fff56 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -470,7 +470,10 @@ libqpidclient_la_SOURCES = \
qpid/client/StateManager.cpp \
qpid/client/Subscription.cpp \
qpid/client/SubscriptionImpl.cpp \
- qpid/client/SubscriptionManager.cpp
+ qpid/client/SubscriptionImpl.h \
+ qpid/client/SubscriptionManager.cpp \
+ qpid/client/SubscriptionManagerImpl.cpp \
+ qpid/client/SubscriptionManagerImpl.h
nobase_include_HEADERS = \
$(platform_hdr) \
@@ -595,7 +598,6 @@ nobase_include_HEADERS = \
qpid/client/Execution.h \
qpid/client/FailoverManager.h \
qpid/client/Subscription.h \
- qpid/client/SubscriptionImpl.h \
qpid/client/SubscriptionSettings.h \
qpid/client/FlowControl.h \
qpid/client/Future.h \
@@ -618,6 +620,29 @@ nobase_include_HEADERS = \
qpid/client/StateManager.h \
qpid/client/SubscriptionManager.h \
qpid/client/TypedResult.h \
+ qpid/client/api/Connection.h \
+ qpid/client/api/ConnectionImpl.h \
+ qpid/client/api/Destination.h \
+ qpid/client/api/Message.h \
+ qpid/client/api/MessageImpl.h \
+ qpid/client/api/MessageListener.h \
+ qpid/client/api/MessageProducer.h \
+ qpid/client/api/MessageProducerImpl.h \
+ qpid/client/api/MessageReceiver.h \
+ qpid/client/api/MessageReceiverImpl.h \
+ qpid/client/api/MessageSink.h \
+ qpid/client/api/MessageSource.h \
+ qpid/client/api/Queue.h \
+ qpid/client/api/Session.h \
+ qpid/client/api/SessionImpl.h \
+ qpid/client/api/Subscription.h \
+ qpid/client/api/SubscriptionImpl.h \
+ qpid/client/api/Topic.h \
+ qpid/client/api/Variant.h \
+ qpid/client/amqp0_10/MessageAdapter.h \
+ qpid/client/amqp0_10/Sinks.h \
+ qpid/client/amqp0_10/Sources.h \
+ qpid/client/amqp0_10/SessionImpl.h \
qpid/framing/AMQBody.h \
qpid/framing/AMQCommandControlBody.h \
qpid/framing/AMQContentBody.h \
diff --git a/cpp/src/qpid/client/LocalQueueImpl.h b/cpp/src/qpid/client/LocalQueueImpl.h
index 64da8675d8..1afaa82d11 100644
--- a/cpp/src/qpid/client/LocalQueueImpl.h
+++ b/cpp/src/qpid/client/LocalQueueImpl.h
@@ -100,7 +100,7 @@ class LocalQueueImpl : public RefCounted {
private:
Demux::QueuePtr queue;
Subscription subscription;
- friend class SubscriptionManager;
+ friend class SubscriptionManagerImpl;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/PrivateImplRef.h b/cpp/src/qpid/client/PrivateImplRef.h
index ae29318eb9..503a383c31 100644
--- a/cpp/src/qpid/client/PrivateImplRef.h
+++ b/cpp/src/qpid/client/PrivateImplRef.h
@@ -76,14 +76,15 @@ template <class T> class PrivateImplRef {
static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
static void set(T& t, const intrusive_ptr& p) {
- if(t.impl) boost::intrusive_ptr_release(t.impl);
+ if (t.impl == p) return;
+ if (t.impl) boost::intrusive_ptr_release(t.impl);
t.impl = p.get();
if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
}
// Helper functions to implement the ctor, dtor, copy, assign
static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
- static void copy(T& t, const T& x) { t.impl = 0; assign(t, x); }
+ static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
};
diff --git a/cpp/src/qpid/client/Subscription.cpp b/cpp/src/qpid/client/Subscription.cpp
index 285a8f242f..23e45a270a 100644
--- a/cpp/src/qpid/client/Subscription.cpp
+++ b/cpp/src/qpid/client/Subscription.cpp
@@ -46,7 +46,7 @@ void Subscription::acquire(const SequenceSet& messageIds) { impl->acquire(messag
void Subscription::accept(const SequenceSet& messageIds) { impl->accept(messageIds); }
void Subscription::release(const SequenceSet& messageIds) { impl->release(messageIds); }
Session Subscription::getSession() const { return impl->getSession(); }
-SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
+SubscriptionManager Subscription::getSubscriptionManager() { return impl->getSubscriptionManager(); }
void Subscription::cancel() { impl->cancel(); }
void Subscription::grantMessageCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_MESSAGE, value); }
void Subscription::grantByteCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_BYTE, value); }
diff --git a/cpp/src/qpid/client/Subscription.h b/cpp/src/qpid/client/Subscription.h
index f39cdd50ac..83a9be5e14 100644
--- a/cpp/src/qpid/client/Subscription.h
+++ b/cpp/src/qpid/client/Subscription.h
@@ -103,7 +103,7 @@ class Subscription : public Handle<SubscriptionImpl> {
QPID_CLIENT_EXTERN Session getSession() const;
/** Get the subscription manager associated with this subscription */
- QPID_CLIENT_EXTERN SubscriptionManager& getSubscriptionManager() const;
+ QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager();
/** Cancel the subscription. */
QPID_CLIENT_EXTERN void cancel();
diff --git a/cpp/src/qpid/client/SubscriptionImpl.cpp b/cpp/src/qpid/client/SubscriptionImpl.cpp
index 69f79a1436..fb5808f3a6 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.cpp
+++ b/cpp/src/qpid/client/SubscriptionImpl.cpp
@@ -19,11 +19,14 @@
*
*/
+#include "AsyncSession.h"
#include "SubscriptionImpl.h"
+#include "SubscriptionManagerImpl.h"
#include "MessageImpl.h"
#include "CompletionImpl.h"
#include "SubscriptionManager.h"
#include "SubscriptionSettings.h"
+#include "PrivateImplRef.h"
namespace qpid {
namespace client {
@@ -31,8 +34,8 @@ namespace client {
using sys::Mutex;
using framing::MessageAcquireResult;
-SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
- : manager(m), name(n), queue(q), settings(s), listener(l)
+SubscriptionImpl::SubscriptionImpl(SubscriptionManager m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
+ : manager(*PrivateImplRef<SubscriptionManager>::get(m)), name(n), queue(q), settings(s), listener(l)
{}
void SubscriptionImpl::subscribe()
@@ -110,7 +113,7 @@ void SubscriptionImpl::release(const SequenceSet& messageIds) {
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
-SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; }
+SubscriptionManager SubscriptionImpl::getSubscriptionManager() { return SubscriptionManager(&manager); }
void SubscriptionImpl::cancel() { manager.cancel(name); }
diff --git a/cpp/src/qpid/client/SubscriptionImpl.h b/cpp/src/qpid/client/SubscriptionImpl.h
index e2b970ce05..da77213423 100644
--- a/cpp/src/qpid/client/SubscriptionImpl.h
+++ b/cpp/src/qpid/client/SubscriptionImpl.h
@@ -23,6 +23,7 @@
*/
#include "qpid/client/SubscriptionSettings.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Session.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Demux.h"
@@ -37,11 +38,12 @@ namespace qpid {
namespace client {
class SubscriptionManager;
+class SubscriptionManagerImpl;
class SubscriptionImpl : public RefCounted, public MessageListener {
public:
- QPID_CLIENT_EXTERN SubscriptionImpl(SubscriptionManager&, const std::string& queue,
- const SubscriptionSettings&, const std::string& name, MessageListener* =0);
+ QPID_CLIENT_EXTERN SubscriptionImpl(SubscriptionManager, const std::string& queue,
+ const SubscriptionSettings&, const std::string& name, MessageListener* =0);
/** The name of the subsctription, used as the "destination" for messages from the broker.
* Usually the same as the queue name but can be set differently.
@@ -84,7 +86,7 @@ class SubscriptionImpl : public RefCounted, public MessageListener {
QPID_CLIENT_EXTERN Session getSession() const;
/** Get the subscription manager associated with this subscription */
- QPID_CLIENT_EXTERN SubscriptionManager& getSubscriptionManager() const;
+ QPID_CLIENT_EXTERN SubscriptionManager getSubscriptionManager();
/** Send subscription request and issue appropriate flow control commands. */
QPID_CLIENT_EXTERN void subscribe();
@@ -110,7 +112,7 @@ class SubscriptionImpl : public RefCounted, public MessageListener {
private:
mutable sys::Mutex lock;
- SubscriptionManager& manager;
+ SubscriptionManagerImpl& manager;
std::string name, queue;
SubscriptionSettings settings;
framing::SequenceSet unacquired, unaccepted;
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 999b9c6ba7..06aa0dcdf8 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -18,144 +18,85 @@
* under the License.
*
*/
-#ifndef _Subscription_
-#define _Subscription_
#include "SubscriptionManager.h"
-#include "SubscriptionImpl.h"
-#include "LocalQueueImpl.h"
+#include "SubscriptionManagerImpl.h"
#include "PrivateImplRef.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/framing/Uuid.h>
-#include <set>
-#include <sstream>
namespace qpid {
namespace client {
-SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s), autoStop(true)
-{}
+typedef PrivateImplRef<SubscriptionManager> PI;
+
+SubscriptionManager::SubscriptionManager(const Session& s) { PI::ctor(*this, new SubscriptionManagerImpl(s)); }
+SubscriptionManager::SubscriptionManager(SubscriptionManagerImpl* i) { PI::ctor(*this, i); }
+SubscriptionManager::SubscriptionManager(const SubscriptionManager& x) : Handle<SubscriptionManagerImpl>() { PI::copy(*this, x); }
+SubscriptionManager::~SubscriptionManager() { PI::dtor(*this); }
+SubscriptionManager& SubscriptionManager::operator=(const SubscriptionManager& x) { return PI::assign(*this, x); }
Subscription SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
-{
- sys::Mutex::ScopedLock l(lock);
- std::string name=n.empty() ? q:n;
- boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
- dispatcher.listen(si);
- //issue subscription request after listener is registered with dispatcher
- si->subscribe();
- return subscriptions[name] = Subscription(si.get());
-}
+{ return impl->subscribe(listener, q, ss, n); }
Subscription SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
-{
- sys::Mutex::ScopedLock l(lock);
- std::string name=n.empty() ? q:n;
- boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
- boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq);
- lqi->queue=si->divert();
- si->subscribe();
- lqi->subscription = Subscription(si.get());
- return subscriptions[name] = lqi->subscription;
-}
+{ return impl->subscribe(lq, q, ss, n); }
+
Subscription SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& n)
-{
- return subscribe(listener, q, defaultSettings, n);
-}
+{ return impl->subscribe(listener, q, n); }
+
Subscription SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& n)
-{
- return subscribe(lq, q, defaultSettings, n);
-}
+{ return impl->subscribe(lq, q, n); }
-void SubscriptionManager::cancel(const std::string& dest)
-{
- sys::Mutex::ScopedLock l(lock);
- std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
- if (i != subscriptions.end()) {
- sync(session).messageCancel(dest);
- dispatcher.cancel(dest);
- Subscription s = i->second;
- if (s.isValid()) s.impl->cancelDiversion();
- subscriptions.erase(i);
- }
-}
+void SubscriptionManager::cancel(const std::string& dest) { return impl->cancel(dest); }
-void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+void SubscriptionManager::setAutoStop(bool set) { impl->setAutoStop(set); }
-void SubscriptionManager::run()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.run();
-}
+void SubscriptionManager::run() { impl->run(); }
-void SubscriptionManager::start()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.start();
-}
+void SubscriptionManager::start() { impl->start(); }
-void SubscriptionManager::wait()
-{
- dispatcher.wait();
-}
+void SubscriptionManager::wait() { impl->wait(); }
-void SubscriptionManager::stop()
-{
- dispatcher.stop();
-}
+void SubscriptionManager::stop() { impl->stop(); }
bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
- LocalQueue lq;
- std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
- AutoCancel ac(*this, unique);
- //first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout))
- return true;
- //make sure message is not on queue before final check
- sync(session).messageFlush(unique);
- return lq.get(result, 0);
+ return impl->get(result, queue, timeout);
}
Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout) {
- Message result;
- if (!get(result, queue, timeout))
- throw Exception("Timed out waiting for a message");
- return result;
+ return impl->get(queue, timeout);
}
-Session SubscriptionManager::getSession() const { return session; }
+Session SubscriptionManager::getSession() const { return impl->getSession(); }
Subscription SubscriptionManager::getSubscription(const std::string& name) const {
- sys::Mutex::ScopedLock l(lock);
- std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name);
- if (i == subscriptions.end())
- throw Exception(QPID_MSG("Subscription not found: " << name));
- return i->second;
+ return impl->getSubscription(name);
}
-
void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
- dispatcher.registerFailoverHandler(fh);
+ impl->registerFailoverHandler(fh);
}
void SubscriptionManager::setFlowControl(const std::string& name, const FlowControl& flow) {
- getSubscription(name).setFlowControl(flow);
+ impl->setFlowControl(name, flow);
}
void SubscriptionManager::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) {
- setFlowControl(name, FlowControl(messages, bytes, window));
+ impl->setFlowControl(name, FlowControl(messages, bytes, window));
}
+void SubscriptionManager::setFlowControl(uint32_t messages, uint32_t bytes, bool window) {
+ impl->setFlowControl(messages, bytes, window);
+}
+
+void SubscriptionManager::setAcceptMode(AcceptMode mode) { impl->setAcceptMode(mode); }
+void SubscriptionManager::setAcquireMode(AcquireMode mode) { impl->setAcquireMode(mode); }
+
}} // namespace qpid::client
-#endif
+
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 91ad2b6d56..eaff0975c5 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -21,22 +21,21 @@
* under the License.
*
*/
-#include "qpid/sys/Mutex.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Completion.h>
+
#include <qpid/client/Session.h>
-#include <qpid/client/AsyncSession.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/LocalQueue.h>
#include <qpid/client/Subscription.h>
#include <qpid/sys/Runnable.h>
#include <qpid/client/ClientImportExport.h>
-#include <set>
-#include <sstream>
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/LocalQueue.h"
+#include <qpid/client/Handle.h>
+#include <string>
namespace qpid {
namespace client {
+class SubscriptionManagerImpl;
+
/**
* A class to help create and manage subscriptions.
*
@@ -94,11 +93,14 @@ namespace client {
* </ul>
*
*/
-class SubscriptionManager : public sys::Runnable
+class SubscriptionManager : public sys::Runnable, public Handle<SubscriptionManagerImpl>
{
public:
/** Create a new SubscriptionManager associated with a session */
QPID_CLIENT_EXTERN SubscriptionManager(const Session& session);
+ QPID_CLIENT_EXTERN SubscriptionManager(const SubscriptionManager&);
+ QPID_CLIENT_EXTERN ~SubscriptionManager();
+ QPID_CLIENT_EXTERN SubscriptionManager& operator=(const SubscriptionManager&);
/**
* Subscribe a MessagesListener to receive messages from queue.
@@ -224,17 +226,17 @@ class SubscriptionManager : public sys::Runnable
/** Set the default settings for subscribe() calls that don't
* include a SubscriptionSettings parameter.
*/
- QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; }
+ QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s);
/** Get the default settings for subscribe() calls that don't
* include a SubscriptionSettings parameter.
*/
- QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; }
+ QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const;
/** Get the default settings for subscribe() calls that don't
* include a SubscriptionSettings parameter.
*/
- QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
+ QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings();
/**
* Set the default flow control settings for subscribe() calls
@@ -244,32 +246,28 @@ class SubscriptionManager : public sys::Runnable
*@param bytes: byte credit.
*@param window: if true use window-based flow control.
*/
- QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) {
- defaultSettings.flowControl = FlowControl(messages, bytes, window);
- }
+ QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
/**
*Set the default accept-mode for subscribe() calls that don't
*include a SubscriptionSettings parameter.
*/
- QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
+ QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode);
/**
* Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
*/
- QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; }
+ QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode);
QPID_CLIENT_EXTERN void registerFailoverHandler ( boost::function<void ()> fh );
QPID_CLIENT_EXTERN Session getSession() const;
+ SubscriptionManager(SubscriptionManagerImpl*); ///<@internal
+
private:
- mutable sys::Mutex lock;
- qpid::client::Dispatcher dispatcher;
- qpid::client::AsyncSession session;
- bool autoStop;
- SubscriptionSettings defaultSettings;
- std::map<std::string, Subscription> subscriptions;
+ typedef SubscriptionManagerImpl Impl;
+ friend class PrivateImplRef<SubscriptionManager>;
};
/** AutoCancel cancels a subscription in its destructor */
diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
new file mode 100644
index 0000000000..27b46c36f8
--- /dev/null
+++ b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SubscriptionManager.h"
+#include "SubscriptionManagerImpl.h"
+#include "SubscriptionImpl.h"
+#include "LocalQueueImpl.h"
+#include "PrivateImplRef.h"
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
+#include <set>
+#include <sstream>
+
+
+namespace qpid {
+namespace client {
+
+SubscriptionManagerImpl::SubscriptionManagerImpl(const Session& s)
+ : dispatcher(s), session(s), autoStop(true)
+{}
+
+Subscription SubscriptionManagerImpl::subscribe(
+ MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
+{
+ sys::Mutex::ScopedLock l(lock);
+ std::string name=n.empty() ? q:n;
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(SubscriptionManager(this), q, ss, name, &listener);
+ dispatcher.listen(si);
+ //issue subscription request after listener is registered with dispatcher
+ si->subscribe();
+ return subscriptions[name] = Subscription(si.get());
+}
+
+Subscription SubscriptionManagerImpl::subscribe(
+ LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
+{
+ sys::Mutex::ScopedLock l(lock);
+ std::string name=n.empty() ? q:n;
+ boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(SubscriptionManager(this), q, ss, name, 0);
+ boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq);
+ lqi->queue=si->divert();
+ si->subscribe();
+ lqi->subscription = Subscription(si.get());
+ return subscriptions[name] = lqi->subscription;
+}
+
+Subscription SubscriptionManagerImpl::subscribe(
+ MessageListener& listener, const std::string& q, const std::string& n)
+{
+ return subscribe(listener, q, defaultSettings, n);
+}
+
+Subscription SubscriptionManagerImpl::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& n)
+{
+ return subscribe(lq, q, defaultSettings, n);
+}
+
+void SubscriptionManagerImpl::cancel(const std::string& dest)
+{
+ sys::Mutex::ScopedLock l(lock);
+ std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
+ if (i != subscriptions.end()) {
+ sync(session).messageCancel(dest);
+ dispatcher.cancel(dest);
+ Subscription s = i->second;
+ if (s.isValid())
+ PrivateImplRef<Subscription>::get(s)->cancelDiversion();
+ subscriptions.erase(i);
+ }
+}
+
+void SubscriptionManagerImpl::setAutoStop(bool set) { autoStop=set; }
+
+void SubscriptionManagerImpl::run()
+{
+ dispatcher.setAutoStop(autoStop);
+ dispatcher.run();
+}
+
+void SubscriptionManagerImpl::start()
+{
+ dispatcher.setAutoStop(autoStop);
+ dispatcher.start();
+}
+
+void SubscriptionManagerImpl::wait()
+{
+ dispatcher.wait();
+}
+
+void SubscriptionManagerImpl::stop()
+{
+ dispatcher.stop();
+}
+
+bool SubscriptionManagerImpl::get(Message& result, const std::string& queue, sys::Duration timeout) {
+ LocalQueue lq;
+ std::string unique = framing::Uuid(true).str();
+ subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
+ SubscriptionManager sm(this);
+ AutoCancel ac(sm, unique);
+ //first wait for message to be delivered if a timeout has been specified
+ if (timeout && lq.get(result, timeout))
+ return true;
+ //make sure message is not on queue before final check
+ sync(session).messageFlush(unique);
+ return lq.get(result, 0);
+}
+
+Message SubscriptionManagerImpl::get(const std::string& queue, sys::Duration timeout) {
+ Message result;
+ if (!get(result, queue, timeout))
+ throw Exception("Timed out waiting for a message");
+ return result;
+}
+
+Session SubscriptionManagerImpl::getSession() const { return session; }
+
+Subscription SubscriptionManagerImpl::getSubscription(const std::string& name) const {
+ sys::Mutex::ScopedLock l(lock);
+ std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name);
+ if (i == subscriptions.end())
+ throw Exception(QPID_MSG("Subscription not found: " << name));
+ return i->second;
+}
+
+void SubscriptionManagerImpl::registerFailoverHandler (boost::function<void ()> fh) {
+ dispatcher.registerFailoverHandler(fh);
+}
+
+void SubscriptionManagerImpl::setFlowControl(const std::string& name, const FlowControl& flow) {
+ getSubscription(name).setFlowControl(flow);
+}
+
+void SubscriptionManagerImpl::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) {
+ setFlowControl(name, FlowControl(messages, bytes, window));
+}
+
+}} // namespace qpid::client
+
+
diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.h b/cpp/src/qpid/client/SubscriptionManagerImpl.h
new file mode 100644
index 0000000000..6376a05c45
--- /dev/null
+++ b/cpp/src/qpid/client/SubscriptionManagerImpl.h
@@ -0,0 +1,278 @@
+#ifndef QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H
+#define QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Mutex.h"
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Completion.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/LocalQueue.h>
+#include <qpid/client/Subscription.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/RefCounted.h>
+#include <set>
+#include <sstream>
+
+namespace qpid {
+namespace client {
+
+/**
+ * A class to help create and manage subscriptions.
+ *
+ * Set up your subscriptions, then call run() to have messages
+ * delivered.
+ *
+ * \ingroup clientapi
+ *
+ * \details
+ *
+ * <h2>Subscribing and canceling subscriptions</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>subscribe()</p>
+ * <pre> SubscriptionManager subscriptions(session);
+ * Listener listener(subscriptions);
+ * subscriptions.subscribe(listener, myQueue);</pre>
+ * <pre> SubscriptionManager subscriptions(session);
+ * LocalQueue local_queue;
+ * subscriptions.subscribe(local_queue, string("message_queue"));</pre></li>
+ * <li>
+ * <p>cancel()</p>
+ * <pre>subscriptions.cancel();</pre></li>
+ * </ul>
+ *
+ * <h2>Waiting for messages (and returning)</h2>
+ *
+ * <ul>
+ * <li>
+ * <p>run()</p>
+ * <pre> // Give up control to receive messages
+ * subscriptions.run();</pre></li>
+ * <li>
+ * <p>stop()</p>
+ * <pre>.// Use this code in a listener to return from run()
+ * subscriptions.stop();</pre></li>
+ * <li>
+ * <p>setAutoStop()</p>
+ * <pre>.// Return from subscriptions.run() when last subscription is cancelled
+ *.subscriptions.setAutoStop(true);
+ *.subscriptons.run();
+ * </pre></li>
+ * <li>
+ * <p>Ending a subscription in a listener</p>
+ * <pre>
+ * void Listener::received(Message&amp; message) {
+ *
+ * if (message.getData() == "That's all, folks!") {
+ * subscriptions.cancel(message.getDestination());
+ * }
+ * }
+ * </pre>
+ * </li>
+ * </ul>
+ *
+ */
+class SubscriptionManagerImpl : public sys::Runnable, public RefCounted
+{
+ public:
+ /** Create a new SubscriptionManagerImpl associated with a session */
+ SubscriptionManagerImpl(const Session& session);
+
+ /**
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
+ * Provide your own subclass of MessagesListener to process
+ * incoming messages. It will be called for each message received.
+ *
+ *@param listener Listener object to receive messages.
+ *@param queue Name of the queue to subscribe to.
+ *@param settings settings for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ */
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ * Incoming messages are stored in the queue for you to retrieve.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param flow initial FlowControl for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ * If not specified, the queue name is used.
+ */
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
+
+ /**
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
+ * Provide your own subclass of MessagesListener to process
+ * incoming messages. It will be called for each message received.
+ *
+ *@param listener Listener object to receive messages.
+ *@param queue Name of the queue to subscribe to.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ * If not specified, the queue name is used.
+ */
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& name=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ * Incoming messages are stored in the queue for you to retrieve.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param name unique destination name for the subscription, defaults to queue name.
+ * If not specified, the queue name is used.
+ */
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& name=std::string());
+
+
+ /** Get a single message from a queue.
+ *@param result is set to the message from the queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return true if result was set, false if no message available after timeout.
+ */
+ bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
+
+ /** Get a single message from a queue.
+ *@param timeout wait up this timeout for a message to appear.
+ *@return message from the queue.
+ *@throw Exception if the timeout is exceeded.
+ */
+ Message get(const std::string& queue, sys::Duration timeout=sys::TIME_INFINITE);
+
+ /** Get a subscription by name.
+ *@throw Exception if not found.
+ */
+ Subscription getSubscription(const std::string& name) const;
+
+ /** Cancel a subscription. See also: Subscription.cancel() */
+ void cancel(const std::string& name);
+
+ /** Deliver messages in the current thread until stop() is called.
+ * Only one thread may be running in a SubscriptionManager at a time.
+ * @see run
+ */
+ void run();
+
+ /** Start a new thread to deliver messages.
+ * Only one thread may be running in a SubscriptionManager at a time.
+ * @see start
+ */
+ void start();
+
+ /**
+ * Wait for the thread started by a call to start() to complete.
+ */
+ void wait();
+
+ /** If set true, run() will stop when all subscriptions
+ * are cancelled. If false, run will only stop when stop()
+ * is called. True by default.
+ */
+ void setAutoStop(bool set=true);
+
+ /** Stop delivery. Causes run() to return, or the thread started with start() to exit. */
+ void stop();
+
+ static const uint32_t UNLIMITED=0xFFFFFFFF;
+
+ /** Set the flow control for a subscription. */
+ void setFlowControl(const std::string& name, const FlowControl& flow);
+
+ /** Set the flow control for a subscription.
+ *@param name: name of the subscription.
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
+ */
+ void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true);
+
+ /** Set the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; }
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; }
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
+
+ /**
+ * Set the default flow control settings for subscribe() calls
+ * that don't include a SubscriptionSettings parameter.
+ *
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
+ */
+ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) {
+ defaultSettings.flowControl = FlowControl(messages, bytes, window);
+ }
+
+ /**
+ *Set the default accept-mode for subscribe() calls that don't
+ *include a SubscriptionSettings parameter.
+ */
+ void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
+
+ /**
+ * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
+ */
+ void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; }
+
+ void registerFailoverHandler ( boost::function<void ()> fh );
+
+ Session getSession() const;
+
+ private:
+ mutable sys::Mutex lock;
+ qpid::client::Dispatcher dispatcher;
+ qpid::client::AsyncSession session;
+ bool autoStop;
+ SubscriptionSettings defaultSettings;
+ std::map<std::string, Subscription> subscriptions;
+};
+
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_SUBSCRIPTIONMANAGERIMPL_H*/
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index b32b7f44ba..c691e351e7 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -29,6 +29,7 @@
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/LocalQueue.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/sys/Thread.h"
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 1c719d16dc..0a72facd86 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -22,7 +22,9 @@
#include "test_tools.h"
#include "BrokerFixture.h"
#include "qpid/client/QueueOptions.h"
+#include "qpid/client/MessageListener.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
diff --git a/cpp/src/tests/echotest.cpp b/cpp/src/tests/echotest.cpp
index 7cbf3e7df4..98590e35ff 100644
--- a/cpp/src/tests/echotest.cpp
+++ b/cpp/src/tests/echotest.cpp
@@ -21,7 +21,7 @@
#include <qpid/client/Connection.h>
#include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/sys/Time.h>
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index e420bf2f0b..379e957ef1 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -23,6 +23,7 @@
#include "test_tools.h"
#include "BrokerFixture.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/MessageListener.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "qpid/framing/reply_exceptions.h"
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 81ac610001..6ad84e1b82 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -28,6 +28,7 @@
#include <vector>
#include "TestOptions.h"
+#include "qpid/sys/Thread.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/AsyncSession.h"
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index 18491d72a0..78606e46cd 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -28,6 +28,7 @@
#include "qpid/client/Message.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Thread.h"
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 1f0b3f5377..44070cd4c9 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -36,6 +36,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Time.h"
diff --git a/cpp/src/tests/txshift.cpp b/cpp/src/tests/txshift.cpp
index dd67710526..97135c9829 100644
--- a/cpp/src/tests/txshift.cpp
+++ b/cpp/src/tests/txshift.cpp
@@ -61,7 +61,7 @@ struct Transfer : MessageListener
Transfer(const std::string control_) : control(control_), expected(0), transfered(0) {}
- void subscribeToSource(SubscriptionManager& manager)
+ void subscribeToSource(SubscriptionManager manager)
{
sourceSettings.autoAck = 0;//will accept once at the end of the batch
sourceSettings.flowControl = FlowControl::messageCredit(expected);
@@ -69,7 +69,7 @@ struct Transfer : MessageListener
QPID_LOG(info, "Subscribed to source: " << source << " expecting: " << expected);
}
- void subscribeToControl(SubscriptionManager& manager)
+ void subscribeToControl(SubscriptionManager manager)
{
controlSettings.flowControl = FlowControl::messageCredit(1);
controlSubscription = manager.subscribe(*this, control, controlSettings);
diff --git a/cpp/src/tests/txtest.cpp b/cpp/src/tests/txtest.cpp
index fb1d19ca8a..c1ee246e2c 100644
--- a/cpp/src/tests/txtest.cpp
+++ b/cpp/src/tests/txtest.cpp
@@ -34,6 +34,7 @@
#include "qpid/framing/Array.h"
#include "qpid/framing/Buffer.h"
#include "qpid/sys/uuid.h"
+#include "qpid/sys/Thread.h"
using namespace qpid;
using namespace qpid::client;