summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am3
-rw-r--r--cpp/src/qpid/client/AckPolicy.h54
-rw-r--r--cpp/src/qpid/client/Connector.cpp3
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp80
-rw-r--r--cpp/src/qpid/client/Dispatcher.h18
-rw-r--r--cpp/src/qpid/client/LocalQueue.cpp47
-rw-r--r--cpp/src/qpid/client/LocalQueue.h15
-rw-r--r--cpp/src/qpid/client/Message.h16
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp1
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp61
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h61
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h7
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp2
-rw-r--r--cpp/src/tests/perftest.cpp380
14 files changed, 402 insertions, 346 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 681b8ed8ed..a0f01fefe8 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -208,6 +208,7 @@ libqpidclient_la_SOURCES = \
qpid/client/Connector.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
+ qpid/client/LocalQueue.cpp \
qpid/client/MessageListener.cpp \
qpid/client/Correlator.cpp \
qpid/client/CompletionTracker.cpp \
@@ -307,6 +308,7 @@ nobase_include_HEADERS = \
qpid/client/Exchange.h \
qpid/client/Message.h \
qpid/client/Queue.h \
+ qpid/client/AckPolicy.h \
qpid/client/Completion.h \
qpid/client/CompletionTracker.h \
qpid/client/Connection.h \
@@ -316,6 +318,7 @@ nobase_include_HEADERS = \
qpid/client/Correlator.h \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
+ qpid/client/LocalQueue.h \
qpid/client/Execution.h \
qpid/client/ExecutionHandler.h \
qpid/client/Future.h \
diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h
new file mode 100644
index 0000000000..2a52a8c746
--- /dev/null
+++ b/cpp/src/qpid/client/AckPolicy.h
@@ -0,0 +1,54 @@
+#ifndef QPID_CLIENT_ACKPOLICY_H
+#define QPID_CLIENT_ACKPOLICY_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * Policy for automatic acknowledgement of messages.
+ */
+class AckPolicy
+{
+ size_t interval;
+ size_t count;
+
+ public:
+ /**
+ *@param n: acknowledge every n messages.
+ *n==0 means no automatick acknowledgement.
+ */
+ AckPolicy(size_t n=1) : interval(n), count(n) {}
+
+ void ack(const Message& msg) {
+ if (!interval) return;
+ bool send=(--count==0);
+ msg.acknowledge(true, send);
+ if (send) count = interval;
+ }
+};
+
+}} // namespace qpid::client
+
+
+
+#endif /*!QPID_CLIENT_ACKPOLICY_H*/
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index ba11ea5569..80d97b10aa 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -52,9 +52,8 @@ Connector::Connector(
}
Connector::~Connector(){
- if (receiver.id()) {
+ if (receiver.id() && receiver.id() != Thread::current().id())
receiver.join();
- }
}
void Connector::connect(const std::string& host, int port){
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index 65756f6404..fd6a18b349 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -27,6 +27,8 @@
#include "qpid/sys/BlockingQueue.h"
#include "Message.h"
+#include <boost/state_saver.hpp>
+
using qpid::framing::FrameSet;
using qpid::framing::MessageTransferBody;
using qpid::sys::Mutex;
@@ -36,23 +38,22 @@ using qpid::sys::Thread;
namespace qpid {
namespace client {
- Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {}
+Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {}
void Subscriber::received(Message& msg)
{
if (listener) {
listener->received(msg);
- if (autoAck) {
- bool send = (++count >= ackBatchSize);
- msg.acknowledge(session, true, send);
- if (send) count = 0;
- }
+ autoAck.ack(msg);
}
}
-
- Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q), running(false), stopped(false)
+Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+ : session(s), running(false)
{
+ queue = q.empty() ?
+ session.execution().getDemux().getDefault() :
+ session.execution().getDemux().get(q);
}
void Dispatcher::start()
@@ -62,19 +63,22 @@ void Dispatcher::start()
void Dispatcher::run()
{
- Demux::QueuePtr q = queue.empty() ?
- session.execution().getDemux().getDefault() :
- session.execution().getDemux().get(queue);
-
- startRunning();
- stopped = false;
- while (!isStopped()) {
- FrameSet::shared_ptr content = q->pop();
+ Mutex::ScopedLock l(lock);
+ if (running)
+ throw Exception("Dispatcher is already running.");
+ boost::state_saver<bool> reset(running); // Reset to false on exit.
+ running = true;
+ queue->open();
+ while (!queue->isClosed()) {
+ Mutex::ScopedUnlock u(lock);
+ FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
- Message msg(*content);
+ Message msg(*content, session);
Subscriber::shared_ptr listener = find(msg.getDestination());
if (!listener) {
- QPID_LOG(error, "No message listener set: " << content->getMethod());
+ // FIXME aconway 2007-11-07: Should close session & throw here?
+ QPID_LOG(error, "No message listener for "
+ << content->getMethod());
} else {
listener->received(msg);
}
@@ -82,41 +86,23 @@ void Dispatcher::run()
if (handler.get()) {
handler->handle(*content);
} else {
+ // FIXME aconway 2007-11-07: Should close session & throw here?
QPID_LOG(error, "Unhandled method: " << content->getMethod());
}
}
}
- stopRunning();
}
void Dispatcher::stop()
{
ScopedLock<Mutex> l(lock);
- stopped = true;
-}
-
-bool Dispatcher::isStopped()
-{
- ScopedLock<Mutex> l(lock);
- return stopped;
-}
-
-/**
- * Prevent concurrent threads invoking run.
- */
-void Dispatcher::startRunning()
-{
- ScopedLock<Mutex> l(lock);
- if (running) {
- throw Exception("Dispatcher is already running.");
- }
- running = true;
+ queue->close(); // Will interrupt thread blocked in pop()
}
-void Dispatcher::stopRunning()
+void Dispatcher::setAutoStop(bool b)
{
ScopedLock<Mutex> l(lock);
- running = false;
+ autoStop = b;
}
Subscriber::shared_ptr Dispatcher::find(const std::string& name)
@@ -129,22 +115,28 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name)
return i->second;
}
-void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(
+ MessageListener* listener, AckPolicy autoAck
+)
{
ScopedLock<Mutex> l(lock);
- defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+ defaultListener = Subscriber::shared_ptr(
+ new Subscriber(session, listener, autoAck));
}
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck)
{
ScopedLock<Mutex> l(lock);
- listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+ listeners[destination] = Subscriber::shared_ptr(
+ new Subscriber(session, listener, autoAck));
}
void Dispatcher::cancel(const std::string& destination)
{
ScopedLock<Mutex> l(lock);
listeners.erase(destination);
+ if (autoStop && listeners.empty())
+ queue->close();
}
}}
diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h
index 550ba36ef5..ae67e61299 100644
--- a/cpp/src/qpid/client/Dispatcher.h
+++ b/cpp/src/qpid/client/Dispatcher.h
@@ -29,6 +29,7 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Thread.h"
#include "MessageListener.h"
+#include "AckPolicy.h"
namespace qpid {
namespace client {
@@ -39,13 +40,11 @@ class Subscriber : public MessageListener
{
Session_0_10& session;
MessageListener* const listener;
- const bool autoAck;
- const uint ackBatchSize;
- uint count;
+ AckPolicy autoAck;
public:
typedef boost::shared_ptr<Subscriber> shared_ptr;
- Subscriber(Session_0_10& session, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
+ Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy);
void received(Message& msg);
};
@@ -58,16 +57,14 @@ class Dispatcher : public sys::Runnable
sys::Mutex lock;
sys::Thread worker;
Session_0_10& session;
- const std::string queue;
+ Demux::QueuePtr queue;
bool running;
- bool stopped;
+ bool autoStop;
Listeners listeners;
Subscriber::shared_ptr defaultListener;
std::auto_ptr<FrameSetHandler> handler;
Subscriber::shared_ptr find(const std::string& name);
- void startRunning();
- void stopRunning();
bool isStopped();
public:
@@ -76,9 +73,10 @@ public:
void start();
void run();
void stop();
+ void setAutoStop(bool b);
- void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
- void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
+ void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
+ void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy());
void cancel(const std::string& destination);
};
diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp
new file mode 100644
index 0000000000..09bf1e055a
--- /dev/null
+++ b/cpp/src/qpid/client/LocalQueue.cpp
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LocalQueue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace client {
+
+using namespace framing;
+
+LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
+LocalQueue::~LocalQueue() {}
+
+Message LocalQueue::pop() {
+ if (!queue)
+ throw ClosedException();
+ FrameSet::shared_ptr content = queue->pop();
+ if (content->isA<MessageTransferBody>())
+ return Message(*content, session);
+ else
+ throw CommandInvalidException(
+ QPID_MSG("Unexpected method: " << content->getMethod()));
+}
+
+void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index 048b4c7b4d..1c910bd3ee 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -23,8 +23,8 @@
*/
#include "qpid/client/Message.h"
-#include "qpid/Exception.h"
-#include "qpid/sys/BlockingQueue.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/AckPolicy.h"
namespace qpid {
namespace client {
@@ -35,16 +35,21 @@ namespace client {
class LocalQueue
{
public:
- LocalQueue(BlockingQueue& q) : queue(q) {}
+ LocalQueue(AckPolicy=AckPolicy());
~LocalQueue();
/** Pop the next message off the queue.
*@exception ClosedException if subscription has been closed.
*/
- Message pop() { reurn queue->pop(); }
+ Message pop();
+
+ void setAckPolicy(AckPolicy);
private:
- BlockingQueue& queue;
+ friend class SubscriptionManager;
+ Session_0_10 session;
+ Demux::QueuePtr queue;
+ AckPolicy autoAck;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index b2ae273813..081384974a 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -68,7 +68,14 @@ public:
session.execution().completed(id, cumulative, send);
}
- Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId())
+ void acknowledge(bool cumulative = true, bool send = true) const
+ {
+ const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send);
+ }
+
+ /**@internal for incoming messages */
+ Message(const framing::FrameSet& frameset, Session_0_10 s) :
+ method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s)
{
populate(frameset);
}
@@ -83,10 +90,13 @@ public:
return id;
}
+ /**@internal use for incoming messages. */
+ void setSession(Session_0_10 s) { session=s; }
private:
//method and id are only set for received messages:
- const framing::MessageTransferBody method;
- const framing::SequenceNumber id;
+ framing::MessageTransferBody method;
+ framing::SequenceNumber id;
+ Session_0_10 session;
};
}}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 02ea57fecc..14b9020282 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -116,7 +116,6 @@ void SessionCore::attaching(shared_ptr<ConnectionImpl> c) {
SessionCore::~SessionCore() {
Lock l(state);
- invariant();
detach(COMMAND_INVALID, "Session deleted");
state.waitWaiters();
}
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index bf5191e8a0..fc65843643 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -33,57 +33,54 @@ namespace qpid {
namespace client {
SubscriptionManager::SubscriptionManager(Session_0_10& s)
- : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true)
+ : dispatcher(s), session(s),
+ messages(UNLIMITED), bytes(UNLIMITED), window(true)
{}
-std::string SubscriptionManager::uniqueTag(const std::string& tag) {
- // Make unique tag.
- int count=1;
- std::string unique=tag;
- while (subscriptions.find(tag) != subscriptions.end()) {
- std::ostringstream s;
- s << tag << "-" << count++;
- unique=s.str();
- }
- subscriptions.insert(unique);
- return tag;
-}
-
-std::string SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& t)
{
- std::string tag=uniqueTag(t);
- using namespace arg;
+ std::string tag=t.empty() ? q:t;
session.messageSubscribe(arg::queue=q, arg::destination=tag);
- flowLimits(tag, messages, bytes);
dispatcher.listen(tag, &listener);
- return tag;
+ setFlowControl(tag, messages, bytes, window);
+}
+
+void SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const std::string& t)
+{
+ std::string tag=t.empty() ? q:t;
+ lq.session=session;
+ lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
+ session.messageSubscribe(arg::queue=q, arg::destination=tag);
+ setFlowControl(tag, messages, bytes, window);
}
-void SubscriptionManager::flowLimits(
- const std::string& tag, uint32_t messages, uint32_t bytes) {
+void SubscriptionManager::setFlowControl(
+ const std::string& tag, uint32_t messages, uint32_t bytes, bool window)
+{
+ session.messageFlowMode(tag, window);
session.messageFlow(tag, 0, messages);
session.messageFlow(tag, 1, bytes);
}
-void SubscriptionManager::flowLimits(uint32_t m, uint32_t b) {
- messages=m;
- bytes=b;
+void SubscriptionManager::setFlowControl(
+ uint32_t messages_, uint32_t bytes_, bool window_)
+{
+ messages=messages_;
+ bytes=bytes_;
+ window=window_;
}
void SubscriptionManager::cancel(const std::string tag)
{
- if (subscriptions.erase(tag)) {
- dispatcher.cancel(tag);
- session.messageCancel(tag);
- if (autoStop && subscriptions.empty()) stop();
- }
+ dispatcher.cancel(tag);
+ session.messageCancel(tag);
}
-void SubscriptionManager::run(bool autoStop_)
+void SubscriptionManager::run(bool autoStop)
{
- autoStop=autoStop_;
- if (autoStop && subscriptions.empty()) return;
+ dispatcher.setAutoStop(autoStop);
dispatcher.run();
}
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 985b6ce222..1a03c1a47b 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -21,55 +21,59 @@
* under the License.
*
*/
-
+#include "qpid/sys/Mutex.h"
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session_0_10.h>
#include <qpid/client/MessageListener.h>
+#include <qpid/client/LocalQueue.h>
#include <set>
#include <sstream>
namespace qpid {
namespace client {
-struct TagNotUniqueException : public qpid::Exception {
- TagNotUniqueException() {}
-};
-
class SubscriptionManager
{
- std::set<std::string> subscriptions;
+ typedef sys::Mutex::ScopedLock Lock;
+ typedef sys::Mutex::ScopedUnlock Unlock;
+
qpid::client::Dispatcher dispatcher;
qpid::client::Session_0_10& session;
- std::string uniqueTag(const std::string&);
uint32_t messages;
uint32_t bytes;
- bool autoStop;
+ bool window;
public:
SubscriptionManager(Session_0_10& session);
/**
- * Subscribe listener to receive messages from queue.
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
*@param tag Unique destination tag for the listener.
- * If not specified a unique tag will be generted based on the queue name.
- *@return Destination tag.
- *@exception TagNotUniqueException if there is already a subscription
- * with the same tag.
+ * If not specified, the queue name is used.
*/
- std::string subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ void subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& tag=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param tag Unique destination tag for the listener.
+ * If not specified, the queue name is used.
+ */
+ void subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& tag=std::string());
/** Cancel a subscription. */
void cancel(const std::string tag);
-
- qpid::client::Dispatcher& getDispatcher() { return dispatcher; }
- size_t size() { return subscriptions.size(); }
/** Deliver messages until stop() is called.
- *@param autoStop If true, return when all subscriptions are cancelled.
+ *@param autoStop If true, return when all listeners are cancelled.
*/
void run(bool autoStop=true);
@@ -78,13 +82,20 @@ public:
static const uint32_t UNLIMITED=0xFFFFFFFF;
- /** Set the flow control limits for subscriber with tag.
- * UNLIMITED means no limit.
+ /** Set the flow control for destination tag.
+ *@param tag: name of the destination.
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
*/
- void flowLimits(const std::string& tag, uint32_t messages, uint32_t bytes);
+ void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true);
- /** Set the initial flow control limits for new subscribers */
- void flowLimits(uint32_t messages, uint32_t bytes);
+ /** Set the initial flow control settings to be applied to each new subscribtion.
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
+ */
+ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
};
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index c53949ad6f..44c95b225d 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -32,7 +32,7 @@ namespace sys {
template <class T>
class BlockingQueue
{
- sys::Waitable lock;
+ mutable sys::Waitable lock;
std::queue<T> queue;
bool closed;
@@ -95,6 +95,11 @@ public:
closed=false;
}
+ bool isClosed() const {
+ Waitable::ScopedLock l(lock);
+ return closed;
+ }
+
private:
void queueNotify(size_t ignore) {
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index ed3d733c20..369477131c 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -48,7 +48,7 @@ struct DummyListener : public MessageListener
void listen()
{
- dispatcher.listen(name, this, true, 1);
+ dispatcher.listen(name, this);
dispatcher.run();
}
diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp
index bc816f6597..80157da7f4 100644
--- a/cpp/src/tests/perftest.cpp
+++ b/cpp/src/tests/perftest.cpp
@@ -21,44 +21,48 @@
#include "TestOptions.h"
-#include "qpid/client/Channel.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
#include "qpid/client/Message.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <iostream>
-#include <cstdlib>
-#include <iomanip>
-#include <time.h>
-#include <unistd.h>
+#include <sstream>
-
-using namespace qpid;
-using namespace qpid::client;
-using namespace qpid::sys;
using namespace std;
+using namespace qpid;
+using namespace client;
+using namespace sys;
struct Opts : public TestOptions {
bool listen;
bool publish;
int count;
- bool durable;
+ int size;
+ bool durable;
+ int consumers;
+ std::string mode;
- Opts() : listen(false), publish(false), count(500000) {
+ Opts() :
+ listen(false), publish(false), count(500000), size(64), consumers(1),
+ mode("shared")
+ {
addOptions()
("listen", optValue(listen), "Consume messages.")
("publish", optValue(publish), "Produce messages.")
- ("count", optValue(count, "N"), "Messages to send/receive.")
- ("durable", optValue(durable, "N"), "Publish messages as durable.");
+ ("count", optValue(count, "N"), "Messages to send.")
+ ("size", optValue(size, "BYTES"), "Size of messages.")
+ ("durable", optValue(durable, "N"), "Publish messages as durable.")
+ ("consumers", optValue(consumers, "N"), "Number of consumers.")
+ ("mode", optValue(mode, "shared|fanout|topic"), "consume mode");
}
};
Opts opts;
+enum Mode { SHARED, FANOUT, TOPIC };
+Mode mode;
struct ListenThread : public Runnable { Thread thread; void run(); };
struct PublishThread : public Runnable { Thread thread; void run(); };
@@ -66,16 +70,22 @@ struct PublishThread : public Runnable { Thread thread; void run(); };
int main(int argc, char** argv) {
try {
opts.parse(argc, argv);
+ if (opts.mode=="shared") mode=SHARED;
+ else if (opts.mode=="fanout") mode = FANOUT;
+ else if (opts.mode=="topic") mode = TOPIC;
+ else throw Exception("Invalid mode");
if (!opts.listen && !opts.publish)
opts.listen = opts.publish = true;
- ListenThread listen;
+ std::vector<ListenThread> listen(opts.consumers);
PublishThread publish;
- if (opts.listen)
- listen.thread=Thread(listen);
+ if (opts.listen)
+ for (int i = 0; i < opts.consumers; ++i)
+ listen[i].thread=Thread(listen[i]);
if (opts.publish)
publish.thread=Thread(publish);
if (opts.listen)
- listen.thread.join();
+ for (int i = 0; i < opts.consumers; ++i)
+ listen[i].thread.join();
if (opts.publish)
publish.thread.join();
}
@@ -84,223 +94,149 @@ int main(int argc, char** argv) {
}
}
-// ================================================================
-// Publish client
-//
+double secs(Duration d) { return double(d)/TIME_SEC; }
+double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); }
-struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) {
- timespec r;
- r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
- r.tv_sec = lhs.tv_sec - rhs.tv_sec;
- if (r.tv_nsec < 0) {
- r.tv_nsec += 1000000000;
- r.tv_sec -= 1;
- }
- return r;
-}
-ostream& operator<<(ostream& o, const struct timespec& ts) {
- o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec;
- return o;
-}
+void expect(string actual, string expect) {
+ if (expect != actual)
+ throw Exception("Expecting "+expect+" but received "+actual);
-double toDouble(const struct timespec& ts) {
- return double(ts.tv_nsec)/1000000000 + ts.tv_sec;
}
-class PublishListener : public MessageListener {
-
- void set_time() {
- timespec ts;
- if (::clock_gettime(CLOCK_REALTIME, &ts))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
- startTime = ts;
- }
-
- void print_time() {
- timespec ts;
- if (::clock_gettime(CLOCK_REALTIME, &ts))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
- cout << "Total Time:" << ts-startTime << endl;
- double rate = messageCount*2/toDouble(ts-startTime);
- cout << "returned Messages:" << messageCount << endl;
- cout << "round trip Rate:" << rate << endl;
- }
-
- struct timespec startTime;
- int messageCount;
- bool done;
- Monitor lock;
-
- public:
-
- PublishListener(int mcount): messageCount(mcount), done(false) {
- set_time();
- }
-
- void received(Message& msg) {
- print_time();
- QPID_LOG(info, "Publisher: received: " << msg.getData());
- Mutex::ScopedLock l(lock);
- QPID_LOG(info, "Publisher: done.");
- done = true;
- lock.notify();
+const char* exchange() {
+ switch (mode) {
+ case SHARED: return ""; // Deafult exchange.
+ case FANOUT: return "amq.fanout";
+ case TOPIC: return "amq.topic";
}
-
- void wait() {
- Mutex::ScopedLock l(lock);
- while (!done)
- lock.wait();
- }
-};
-
+ assert(0);
+ return 0;
+}
void PublishThread::run() {
- Connection connection;
- Channel channel;
- Message msg;
- opts.open(connection);
- connection.openChannel(channel);
- channel.start();
-
- cout << "Started publisher." << endl;
- string queueControl = "control";
- Queue response(queueControl);
- channel.declareQueue(response);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);
+ try {
+ Connection connection;
+ opts.open(connection);
+ Session_0_10 session = connection.newSession();
+
+ session.queueDeclare(arg::queue="control"); // Control queue
+ session.queuePurge(arg::queue="control");
+ if (mode==SHARED) {
+ session.queueDeclare(arg::queue="perftest"); // Shared data queue
+ session.queuePurge(arg::queue="perftest");
+ }
- string queueName ="queue01";
- string queueNameC =queueName+"-1";
-
- // create publish queue
- Queue publish(queueName);
- channel.declareQueue(publish);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName);
-
- // create completion queue
- Queue completion(queueNameC);
- channel.declareQueue(completion);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
-
- // pass queue name
- msg.setData(queueName);
- channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl);
-
- QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC);
-
- int count = opts.count;
- PublishListener listener(count);
- channel.consume(completion, queueNameC, &listener);
- QPID_LOG(info, "Publisher setup consumer: "<< queueNameC);
-
- struct timespec startTime;
- if (::clock_gettime(CLOCK_REALTIME, &startTime))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-
- bool durable = opts.durable;
- if (durable)
+ // Wait for consumers.
+ SubscriptionManager subs(session);
+ LocalQueue control;
+ subs.subscribe(control, "control");
+ for (int i = 0; i < opts.consumers; ++i)
+ expect(control.pop().getData(), "ready");
+
+ // Create test message
+ size_t msgSize=max(opts.size, 32);
+ Message msg(string(msgSize, 'X'), "perftest");
+ char* msgBuf = const_cast<char*>(msg.getData().data());
+ if (opts.durable)
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ // Time sending message.
+ AbsTime start=now();
+ cout << "Publishing " << opts.count << " messages " << flush;
+ for (int i=0; i<opts.count; i++) {
+ sprintf(msgBuf, "%d", i);
+ session.messageTransfer(arg::destination=exchange(),
+ arg::content=msg);
+ if ((i%10000)==0) cout << "." << flush;
+ }
+ cout << " done." << endl;
+ msg.setData("done"); // Send done messages.
+ if (mode==SHARED)
+ for (int i = 0; i < opts.consumers; ++i)
+ session.messageTransfer(arg::destination=exchange(), arg::content=msg);
+ else
+ session.messageTransfer(arg::destination=exchange(), arg::content=msg);
+ AbsTime end=now();
+
+ // Report
+ cout << endl;
+ cout << "publish count:" << opts.count << endl;
+ cout << "publish secs:" << secs(start,end) << endl;
+ cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
+
+ // Wait for consumer(s) to finish.
+ for (int i = 0; i < opts.consumers; ++i) {
+ string report=control.pop().getData();
+ if (report.find("consume") != 0)
+ throw Exception("Expected consumer report, got: "+report);
+ cout << endl << report;
+ }
+ end=now();
+
+ // Count total transfers from publisher and to subscribers.
+ int transfers;
+ if (mode==SHARED) // each message sent/receivd once.
+ transfers=2*opts.count;
+ else // sent once, received N times.
+ transfers=opts.count*(opts.consumers + 1);
+
+ cout << endl
+ << "total transfers:" << transfers << endl
+ << "total secs:" << secs(start, end) << endl
+ << "total transfers/sec:" << transfers/secs(start, end) << endl;
- for (int i=0; i<count; i++) {
- msg.setData("Message 0123456789 ");
- channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
+ connection.close();
+ }
+ catch (const std::exception& e) {
+ cout << "PublishThread exception: " << e.what() << endl;
}
-
- struct timespec endTime;
- if (::clock_gettime(CLOCK_REALTIME, &endTime))
- throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-
- cout << "publish Time:" << endTime-startTime << endl;
- double rate = count/toDouble(endTime-startTime);
- cout << "publish Messages:" << count << endl;
- cout << "publish Rate:" << rate << endl;
-
- msg.setData(queueName); // last message to queue.
- channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
-
- listener.wait();
-
- channel.close();
- connection.close();
}
-
-
-// ================================================================
-// Listen client
-//
-
-class Listener : public MessageListener{
- string queueName;
- Monitor lock;
- bool done;
-
- public:
- Listener (string& _queueName): queueName(_queueName), done(false) {};
-
- void received(Message& msg) {
- if (msg.getData() == queueName)
- {
- Mutex::ScopedLock l(lock);
- QPID_LOG(info, "Listener: done. " << queueName);
- done = true;
- lock.notify();
+void ListenThread::run() {
+ try {
+ Connection connection;
+ opts.open(connection);
+ Session_0_10 session = connection.newSession();
+
+ string consumeQueue;
+ switch (mode) {
+ case SHARED:
+ consumeQueue="perftest";
+ session.queueDeclare(arg::queue="perftest");
+ break;
+ case FANOUT:
+ case TOPIC:
+ consumeQueue=session.getId().str(); // Unique
+ session.queueDeclare(arg::queue=consumeQueue,
+ arg::exclusive=true,
+ arg::autoDelete=true);
+ session.queueBind(arg::queue=consumeQueue,
+ arg::exchange=exchange(),
+ arg::routingKey="perftest");
}
+ // Notify publisher we are ready.
+ session.queueDeclare(arg::queue="control"); // Control queue
+ session.messageTransfer(arg::content=Message("ready", "control"));
+
+ SubscriptionManager subs(session);
+ LocalQueue consume;
+ subs.subscribe(consume, consumeQueue);
+ int consumed=0;
+ AbsTime start=now();
+ while (consume.pop().getData() != "done")
+ ++consumed;
+ AbsTime end=now();
+
+ // Report to publisher.
+ ostringstream report;
+ report << "consume count: " << consumed << endl
+ << "consume secs: " << secs(start, end) << endl
+ << "consume rate: " << consumed/secs(start,end) << endl;
+ session.messageTransfer(arg::content=Message(report.str(), "control"));
+ connection.close();
}
-
- void wait() {
- Mutex::ScopedLock l(lock);
- while (!done)
- lock.wait();
- }
-};
-
-void ListenThread::run() {
- Connection connection;
- Channel channel;
- Message msg;
- Message msg1;
- cout << "Started listener." << endl;;
- opts.open(connection);
- connection.openChannel(channel);
- channel.start();
-
- string queueControl = "control";
- Queue response(queueControl);
- channel.declareQueue(response);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);
- while (!channel.get(msg, response, AUTO_ACK)) {
- QPID_LOG(info, "Listener: waiting for queue name.");
- sleep(1);
+ catch (const std::exception& e) {
+ cout << "PublishThread exception: " << e.what() << endl;
}
- string queueName =msg.getData();
- string queueNameC =queueName+ "-1";
-
- QPID_LOG(info, "Listener: Using Queue:" << queueName);
- QPID_LOG(info, "Listener: Reply Queue:" << queueNameC);
- // create consume queue
- Queue consume(queueName);
- channel.declareQueue(consume);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName);
-
- // create completion queue
- Queue completion(queueNameC);
- channel.declareQueue(completion);
- channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
-
- Listener listener(queueName);
- channel.consume(consume, queueName, &listener);
- QPID_LOG(info, "Listener: consuming...");
-
- listener.wait();
-
- QPID_LOG(info, "Listener: send final message.");
- // complete.
- msg1.setData(queueName);
- channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC);
-
- channel.close();
- connection.close();
}
-