summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-07 19:57:46 +0000
committerAlan Conway <aconway@apache.org>2007-11-07 19:57:46 +0000
commit710b8a1f1285b9aa5bccee5b1906500667dd7bc5 (patch)
tree83005778c44cf7d897cef882ced2330bc8bd2228 /cpp/src/qpid
parentd19657d82321b2b5e2cac386c49aa99f82b976fb (diff)
downloadqpid-python-710b8a1f1285b9aa5bccee5b1906500667dd7bc5.tar.gz
client::SubscriptionManager:
- Added autoStop support. - Added LocalQueue subscriptions. - Expose AckPolicy settings to user. client::Message: - incoming Messages carry their session for acknowledge perftest: (see perftest --help for details...) - allow multiple consumers. - 3 queue modes: shared, fanout, topic. - set size of messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@592869 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/client/AckPolicy.h54
-rw-r--r--cpp/src/qpid/client/Connector.cpp3
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp80
-rw-r--r--cpp/src/qpid/client/Dispatcher.h18
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp47
-rw-r--r--cpp/src/qpid/client/LocalQueue.h15
-rw-r--r--cpp/src/qpid/client/Message.h16
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp1
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp61
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h61
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h7
11 files changed, 240 insertions, 123 deletions
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h
new file mode 100644
index 0000000000..2a52a8c746
--- /dev/null
+++ b/cpp/src/qpid/client/AckPolicy.h
@@ -0,0 +1,54 @@
+#ifndef QPID_CLIENT_ACKPOLICY_H
+#define QPID_CLIENT_ACKPOLICY_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * Policy for automatic acknowledgement of messages.
+ */
+class AckPolicy
+{
+ size_t interval;
+ size_t count;
+
+ public:
+ /**
+ *@param n: acknowledge every n messages.
+ *n==0 means no automatick acknowledgement.
+ */
+ AckPolicy(size_t n=1) : interval(n), count(n) {}
+
+ void ack(const Message& msg) {
+ if (!interval) return;
+ bool send=(--count==0);
+ msg.acknowledge(true, send);
+ if (send) count = interval;
+ }
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!QPID_CLIENT_ACKPOLICY_H*/
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index ba11ea5569..80d97b10aa 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -52,9 +52,8 @@ Connector::Connector(
}
Connector::~Connector(){
- if (receiver.id()) {
+ if (receiver.id() && receiver.id() != Thread::current().id())
receiver.join();
- }
}
void Connector::connect(const std::string& host, int port){
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 65756f6404..fd6a18b349 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -27,6 +27,8 @@
#include "qpid/sys/BlockingQueue.h"
#include "Message.h"
+#include <boost/state_saver.hpp>
+
using qpid::framing::FrameSet;
using qpid::framing::MessageTransferBody;
using qpid::sys::Mutex;
@@ -36,23 +38,22 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
- Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {}
+Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
if (listener) {
listener->received(msg);
- if (autoAck) {
- bool send = (++count >= ackBatchSize);
- msg.acknowledge(session, true, send);
- if (send) count = 0;
- }
+ autoAck.ack(msg);
}
}
-
- Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q), running(false), stopped(false)
+Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+ : session(s), running(false)
{
+ queue = q.empty() ?
+ session.execution().getDemux().getDefault() :
+ session.execution().getDemux().get(q);
}
void Dispatcher::start()
@@ -62,19 +63,22 @@ void Dispatcher::start()
void Dispatcher::run()
{
- Demux::QueuePtr q = queue.empty() ?
- session.execution().getDemux().getDefault() :
- session.execution().getDemux().get(queue);
-
- startRunning();
- stopped = false;
- while (!isStopped()) {
- FrameSet::shared_ptr content = q->pop();
+ Mutex::ScopedLock l(lock);
+ if (running)
+ throw Exception("Dispatcher is already running.");
+ boost::state_saver<bool> reset(running); // Reset to false on exit.
+ running = true;
+ queue->open();
+ while (!queue->isClosed()) {
+ Mutex::ScopedUnlock u(lock);
+ FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content);
+ Message msg(*content, session);
Subscriber::shared_ptr listener = find(msg.getDestination());
if (!listener) {
- QPID_LOG(error, "No message listener set: " << content->getMethod());
+ // FIXME aconway 2007-11-07: Should close session & throw here?
+ QPID_LOG(error, "No message listener for "
+ << content->getMethod());
} else {
listener->received(msg);
}
@@ -82,41 +86,23 @@ void Dispatcher::run()
if (handler.get()) {
handler->handle(*content);
} else {
+ // FIXME aconway 2007-11-07: Should close session & throw here?
QPID_LOG(error, "Unhandled method: " << content->getMethod());
}
}
}
- stopRunning();
}
void Dispatcher::stop()
{
ScopedLock<Mutex> l(lock);
- stopped = true;
-}
-
-bool Dispatcher::isStopped()
-{
- ScopedLock<Mutex> l(lock);
- return stopped;
-}
-
-/**
- * Prevent concurrent threads invoking run.
- */
-void Dispatcher::startRunning()
-{
- ScopedLock<Mutex> l(lock);
- if (running) {
- throw Exception("Dispatcher is already running.");
- }
- running = true;
+ queue->close(); // Will interrupt thread blocked in pop()
}
-void Dispatcher::stopRunning()
+void Dispatcher::setAutoStop(bool b)
{
ScopedLock<Mutex> l(lock);
- running = false;
+ autoStop = b;
}
Subscriber::shared_ptr Dispatcher::find(const std::string& name)
@@ -129,22 +115,28 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name)
return i->second;
}
-void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(
+ MessageListener* listener, AckPolicy autoAck
+)
{
ScopedLock<Mutex> l(lock);
- defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+ defaultListener = Subscriber::shared_ptr(
+ new Subscriber(session, listener, autoAck));
}
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck)
{
ScopedLock<Mutex> l(lock);
- listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+ listeners[destination] = Subscriber::shared_ptr(
+ new Subscriber(session, listener, autoAck));
}
void Dispatcher::cancel(const std::string& destination)
{
ScopedLock<Mutex> l(lock);
listeners.erase(destination);
+ if (autoStop && listeners.empty())
+ queue->close();
}
}}
diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h
index 550ba36ef5..ae67e61299 100644
--- a/cpp/src/qpid/client/Dispatcher.h
+++ b/cpp/src/qpid/client/Dispatcher.h
@@ -29,6 +29,7 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "MessageListener.h"
+#include "AckPolicy.h"
namespace qpid {
namespace client {
@@ -39,13 +40,11 @@ class Subscriber : public MessageListener
{
Session_0_10& session;
MessageListener* const listener;
- const bool autoAck;
- const uint ackBatchSize;
- uint count;
+ AckPolicy autoAck;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session_0_10& session, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
+ Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy);
void received(Message& msg);
};
@@ -58,16 +57,14 @@ class Dispatcher : public sys::Runnable
sys::Mutex lock;
sys::Thread worker;
Session_0_10& session;
- const std::string queue;
+ Demux::QueuePtr queue;
bool running;
- bool stopped;
+ bool autoStop;
Listeners listeners;
Subscriber::shared_ptr defaultListener;
std::auto_ptr<FrameSetHandler> handler;
Subscriber::shared_ptr find(const std::string& name);
- void startRunning();
- void stopRunning();
bool isStopped();
public:
@@ -76,9 +73,10 @@ public:
void start();
void run();
void stop();
+ void setAutoStop(bool b);
- void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
- void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
+ void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
+ void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
void cancel(const std::string& destination);
};
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
new file mode 100644
index 0000000000..09bf1e055a
--- /dev/null
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LocalQueue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace client {
+
+using namespace framing;
+
+LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
+LocalQueue::~LocalQueue() {}
+
+Message LocalQueue::pop() {
+ if (!queue)
+ throw ClosedException();
+ FrameSet::shared_ptr content = queue->pop();
+ if (content->isA<MessageTransferBody>())
+ return Message(*content, session);
+ else
+ throw CommandInvalidException(
+ QPID_MSG("Unexpected method: " << content->getMethod()));
+}
+
+void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index 048b4c7b4d..1c910bd3ee 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -23,8 +23,8 @@
*/
#include "qpid/client/Message.h"
-#include "qpid/Exception.h"
-#include "qpid/sys/BlockingQueue.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/AckPolicy.h"
namespace qpid {
namespace client {
@@ -35,16 +35,21 @@ namespace client {
class LocalQueue
{
public:
- LocalQueue(BlockingQueue& q) : queue(q) {}
+ LocalQueue(AckPolicy=AckPolicy());
~LocalQueue();
/** Pop the next message off the queue.
*@exception ClosedException if subscription has been closed.
*/
- Message pop() { reurn queue->pop(); }
+ Message pop();
+
+ void setAckPolicy(AckPolicy);
private:
- BlockingQueue& queue;
+ friend class SubscriptionManager;
+ Session_0_10 session;
+ Demux::QueuePtr queue;
+ AckPolicy autoAck;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index b2ae273813..081384974a 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -68,7 +68,14 @@ public:
session.execution().completed(id, cumulative, send);
}
- Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
+ void acknowledge(bool cumulative = true, bool send = true) const
+ {
+ const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send);
+ }
+
+ /**@internal for incoming messages */
+ Message(const framing::FrameSet& frameset, Session_0_10 s) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
{
populate(frameset);
}
@@ -83,10 +90,13 @@ public:
return id;
}
+ /**@internal use for incoming messages. */
+ void setSession(Session_0_10 s) { session=s; }
private:
//method and id are only set for received messages:
- const framing::MessageTransferBody method;
- const framing::SequenceNumber id;
+ framing::MessageTransferBody method;
+ framing::SequenceNumber id;
+ Session_0_10 session;
};
}}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 02ea57fecc..14b9020282 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -116,7 +116,6 @@ void SessionCore::attaching(shared_ptr<ConnectionImpl> c) {
SessionCore::~SessionCore() {
Lock l(state);
- invariant();
detach(COMMAND_INVALID, "Session deleted");
state.waitWaiters();
}
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index bf5191e8a0..fc65843643 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -33,57 +33,54 @@ namespace qpid {
namespace client {
SubscriptionManager::SubscriptionManager(Session_0_10& s)
- : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true)
+ : dispatcher(s), session(s),
+ messages(UNLIMITED), bytes(UNLIMITED), window(true)
{}
-std::string SubscriptionManager::uniqueTag(const std::string& tag) {
- // Make unique tag.
- int count=1;
- std::string unique=tag;
- while (subscriptions.find(tag) != subscriptions.end()) {
- std::ostringstream s;
- s << tag << "-" << count++;
- unique=s.str();
- }
- subscriptions.insert(unique);
- return tag;
-}
-
-std::string SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& t)
{
- std::string tag=uniqueTag(t);
- using namespace arg;
+ std::string tag=t.empty() ? q:t;
session.messageSubscribe(arg::queue=q, arg::destination=tag);
- flowLimits(tag, messages, bytes);
dispatcher.listen(tag, &listener);
- return tag;
+ setFlowControl(tag, messages, bytes, window);
+}
+
+void SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& t)
+{
+ std::string tag=t.empty() ? q:t;
+ lq.session=session;
+ lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
+ session.messageSubscribe(arg::queue=q, arg::destination=tag);
+ setFlowControl(tag, messages, bytes, window);
}
-void SubscriptionManager::flowLimits(
- const std::string& tag, uint32_t messages, uint32_t bytes) {
+void SubscriptionManager::setFlowControl(
+ const std::string& tag, uint32_t messages, uint32_t bytes, bool window)
+{
+ session.messageFlowMode(tag, window);
session.messageFlow(tag, 0, messages);
session.messageFlow(tag, 1, bytes);
}
-void SubscriptionManager::flowLimits(uint32_t m, uint32_t b) {
- messages=m;
- bytes=b;
+void SubscriptionManager::setFlowControl(
+ uint32_t messages_, uint32_t bytes_, bool window_)
+{
+ messages=messages_;
+ bytes=bytes_;
+ window=window_;
}
void SubscriptionManager::cancel(const std::string tag)
{
- if (subscriptions.erase(tag)) {
- dispatcher.cancel(tag);
- session.messageCancel(tag);
- if (autoStop && subscriptions.empty()) stop();
- }
+ dispatcher.cancel(tag);
+ session.messageCancel(tag);
}
-void SubscriptionManager::run(bool autoStop_)
+void SubscriptionManager::run(bool autoStop)
{
- autoStop=autoStop_;
- if (autoStop && subscriptions.empty()) return;
+ dispatcher.setAutoStop(autoStop);
dispatcher.run();
}
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 985b6ce222..1a03c1a47b 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -21,55 +21,59 @@
* under the License.
*
*/
-
+#include "qpid/sys/Mutex.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session_0_10.h>
#include <qpid/client/MessageListener.h>
+#include <qpid/client/LocalQueue.h>
#include <set>
#include <sstream>
namespace qpid {
namespace client {
-struct TagNotUniqueException : public qpid::Exception {
- TagNotUniqueException() {}
-};
-
class SubscriptionManager
{
- std::set<std::string> subscriptions;
+ typedef sys::Mutex::ScopedLock Lock;
+ typedef sys::Mutex::ScopedUnlock Unlock;
+
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
- std::string uniqueTag(const std::string&);
uint32_t messages;
uint32_t bytes;
- bool autoStop;
+ bool window;
public:
SubscriptionManager(Session_0_10& session);
/**
- * Subscribe listener to receive messages from queue.
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
*@param tag Unique destination tag for the listener.
- * If not specified a unique tag will be generted based on the queue name.
- *@return Destination tag.
- *@exception TagNotUniqueException if there is already a subscription
- * with the same tag.
+ * If not specified, the queue name is used.
*/
- std::string subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ void subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& tag=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param tag Unique destination tag for the listener.
+ * If not specified, the queue name is used.
+ */
+ void subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& tag=std::string());
/** Cancel a subscription. */
void cancel(const std::string tag);
-
- qpid::client::Dispatcher& getDispatcher() { return dispatcher; }
- size_t size() { return subscriptions.size(); }
/** Deliver messages until stop() is called.
- *@param autoStop If true, return when all subscriptions are cancelled.
+ *@param autoStop If true, return when all listeners are cancelled.
*/
void run(bool autoStop=true);
@@ -78,13 +82,20 @@ public:
static const uint32_t UNLIMITED=0xFFFFFFFF;
- /** Set the flow control limits for subscriber with tag.
- * UNLIMITED means no limit.
+ /** Set the flow control for destination tag.
+ *@param tag: name of the destination.
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
*/
- void flowLimits(const std::string& tag, uint32_t messages, uint32_t bytes);
+ void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true);
- /** Set the initial flow control limits for new subscribers */
- void flowLimits(uint32_t messages, uint32_t bytes);
+ /** Set the initial flow control settings to be applied to each new subscribtion.
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
+ */
+ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
};
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index c53949ad6f..44c95b225d 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -32,7 +32,7 @@ namespace sys {
template <class T>
class BlockingQueue
{
- sys::Waitable lock;
+ mutable sys::Waitable lock;
std::queue<T> queue;
bool closed;
@@ -95,6 +95,11 @@ public:
closed=false;
}
+ bool isClosed() const {
+ Waitable::ScopedLock l(lock);
+ return closed;
+ }
+
private:
void queueNotify(size_t ignore) {