summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-14 13:47:39 +0000
committerAlan Conway <aconway@apache.org>2007-11-14 13:47:39 +0000
commit26f34cfdef50ed420e98e114b6735ceb898cc74a (patch)
treef75585d51b0c9cd65508790f37c41b7ce6b57a8a /cpp/src
parent03fd65578602ff436f4f2938528b47aa35f074e0 (diff)
downloadqpid-python-26f34cfdef50ed420e98e114b6735ceb898cc74a.tar.gz
Added auto-ack and commit-mode control to SubscriptionManager API.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594879 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp48
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h15
2 files changed, 45 insertions, 18 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 478b8438c2..1282a1cf61 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -34,34 +34,42 @@ namespace client {
SubscriptionManager::SubscriptionManager(Session_0_10& s)
: dispatcher(s), session(s),
- messages(UNLIMITED), bytes(UNLIMITED), window(true)
+ messages(UNLIMITED), bytes(UNLIMITED), window(true),
+ confirmMode(true)
{}
+void SubscriptionManager::subscribeInternal(
+ const std::string& q, const std::string& dest)
+{
+ session.messageSubscribe(arg::queue=q, arg::destination=dest,
+ arg::confirmMode=confirmMode);
+ setFlowControl(dest, messages, bytes, window);
+}
+
void SubscriptionManager::subscribe(
- MessageListener& listener, const std::string& q, const std::string& t)
+ MessageListener& listener, const std::string& q, const std::string& d)
{
- std::string tag=t.empty() ? q:t;
- dispatcher.listen(tag, &listener);
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
- setFlowControl(tag, messages, bytes, window);
+ std::string dest=d.empty() ? q:d;
+ dispatcher.listen(dest, &listener, autoAck);
+ subscribeInternal(q, dest);
}
void SubscriptionManager::subscribe(
- LocalQueue& lq, const std::string& q, const std::string& t)
+ LocalQueue& lq, const std::string& q, const std::string& d)
{
- std::string tag=t.empty() ? q:t;
+ std::string dest=d.empty() ? q:d;
lq.session=session;
- lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
- setFlowControl(tag, messages, bytes, window);
+ lq.queue=session.execution().getDemux().add(dest, ByTransferDest(dest));
+ lq.setAckPolicy(autoAck);
+ subscribeInternal(q, dest);
}
void SubscriptionManager::setFlowControl(
- const std::string& tag, uint32_t messages, uint32_t bytes, bool window)
+ const std::string& dest, uint32_t messages, uint32_t bytes, bool window)
{
- session.messageFlowMode(tag, window);
- session.messageFlow(tag, 0, messages);
- session.messageFlow(tag, 1, bytes);
+ session.messageFlowMode(dest, window);
+ session.messageFlow(dest, 0, messages);
+ session.messageFlow(dest, 1, bytes);
}
void SubscriptionManager::setFlowControl(
@@ -72,10 +80,14 @@ void SubscriptionManager::setFlowControl(
window=window_;
}
-void SubscriptionManager::cancel(const std::string tag)
+void SubscriptionManager::setConfirmMode(bool c) { confirmMode=c; }
+
+void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; }
+
+void SubscriptionManager::cancel(const std::string dest)
{
- dispatcher.cancel(tag);
- session.messageCancel(tag);
+ dispatcher.cancel(dest);
+ session.messageCancel(dest);
}
void SubscriptionManager::run(bool autoStop)
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 1a03c1a47b..5cff46f0f2 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -37,11 +37,15 @@ class SubscriptionManager
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
+ void subscribeInternal(const std::string& q, const std::string& dest);
+
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
uint32_t messages;
uint32_t bytes;
bool window;
+ AckPolicy autoAck;
+ bool confirmMode;
public:
SubscriptionManager(Session_0_10& session);
@@ -96,6 +100,17 @@ public:
*@param window: if true use window-based flow control.
*/
void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
+
+ /** Set the confirm-mode for new subscriptions. Defaults to true.
+ *@param confirm: if true messages must be confirmed by calling
+ *Message::acknowledge() or automatically, see setAckPolicy()
+ */
+ void setConfirmMode(bool confirm);
+
+ /** Set the acknowledgement policy for new subscriptions.
+ * Default is to acknowledge every message automatically.
+ */
+ void setAckPolicy(const AckPolicy& autoAck);
};