diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/AckPolicy.h | 54 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 80 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.h | 18 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 15 | ||||
-rw-r--r-- | cpp/src/qpid/client/Message.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 61 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 380 |
14 files changed, 402 insertions, 346 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 681b8ed8ed..a0f01fefe8 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -208,6 +208,7 @@ libqpidclient_la_SOURCES = \ qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ + qpid/client/LocalQueue.cpp \ qpid/client/MessageListener.cpp \ qpid/client/Correlator.cpp \ qpid/client/CompletionTracker.cpp \ @@ -307,6 +308,7 @@ nobase_include_HEADERS = \ qpid/client/Exchange.h \ qpid/client/Message.h \ qpid/client/Queue.h \ + qpid/client/AckPolicy.h \ qpid/client/Completion.h \ qpid/client/CompletionTracker.h \ qpid/client/Connection.h \ @@ -316,6 +318,7 @@ nobase_include_HEADERS = \ qpid/client/Correlator.h \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ + qpid/client/LocalQueue.h \ qpid/client/Execution.h \ qpid/client/ExecutionHandler.h \ qpid/client/Future.h \ diff --git a/cpp/src/qpid/client/AckPolicy.h b/cpp/src/qpid/client/AckPolicy.h new file mode 100644 index 0000000000..2a52a8c746 --- /dev/null +++ b/cpp/src/qpid/client/AckPolicy.h @@ -0,0 +1,54 @@ +#ifndef QPID_CLIENT_ACKPOLICY_H +#define QPID_CLIENT_ACKPOLICY_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * Policy for automatic acknowledgement of messages. + */ +class AckPolicy +{ + size_t interval; + size_t count; + + public: + /** + *@param n: acknowledge every n messages. + *n==0 means no automatick acknowledgement. + */ + AckPolicy(size_t n=1) : interval(n), count(n) {} + + void ack(const Message& msg) { + if (!interval) return; + bool send=(--count==0); + msg.acknowledge(true, send); + if (send) count = interval; + } +}; + +}} // namespace qpid::client + + + +#endif /*!QPID_CLIENT_ACKPOLICY_H*/ diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index ba11ea5569..80d97b10aa 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -52,9 +52,8 @@ Connector::Connector( } Connector::~Connector(){ - if (receiver.id()) { + if (receiver.id() && receiver.id() != Thread::current().id()) receiver.join(); - } } void Connector::connect(const std::string& host, int port){ diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 65756f6404..fd6a18b349 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -27,6 +27,8 @@ #include "qpid/sys/BlockingQueue.h" #include "Message.h" +#include <boost/state_saver.hpp> + using qpid::framing::FrameSet; using qpid::framing::MessageTransferBody; using qpid::sys::Mutex; @@ -36,23 +38,22 @@ using qpid::sys::Thread; namespace qpid { namespace client { - Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackBatchSize(f), count(0) {} +Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s), listener(l), autoAck(a) {} void Subscriber::received(Message& msg) { if (listener) { listener->received(msg); - if (autoAck) { - bool send = (++count >= ackBatchSize); - msg.acknowledge(session, true, send); - if (send) count = 0; - } + autoAck.ack(msg); } } - - Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q), running(false), stopped(false) +Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) + : session(s), running(false) { + queue = q.empty() ? + session.execution().getDemux().getDefault() : + session.execution().getDemux().get(q); } void Dispatcher::start() @@ -62,19 +63,22 @@ void Dispatcher::start() void Dispatcher::run() { - Demux::QueuePtr q = queue.empty() ? - session.execution().getDemux().getDefault() : - session.execution().getDemux().get(queue); - - startRunning(); - stopped = false; - while (!isStopped()) { - FrameSet::shared_ptr content = q->pop(); + Mutex::ScopedLock l(lock); + if (running) + throw Exception("Dispatcher is already running."); + boost::state_saver<bool> reset(running); // Reset to false on exit. + running = true; + queue->open(); + while (!queue->isClosed()) { + Mutex::ScopedUnlock u(lock); + FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { - Message msg(*content); + Message msg(*content, session); Subscriber::shared_ptr listener = find(msg.getDestination()); if (!listener) { - QPID_LOG(error, "No message listener set: " << content->getMethod()); + // FIXME aconway 2007-11-07: Should close session & throw here? + QPID_LOG(error, "No message listener for " + << content->getMethod()); } else { listener->received(msg); } @@ -82,41 +86,23 @@ void Dispatcher::run() if (handler.get()) { handler->handle(*content); } else { + // FIXME aconway 2007-11-07: Should close session & throw here? QPID_LOG(error, "Unhandled method: " << content->getMethod()); } } } - stopRunning(); } void Dispatcher::stop() { ScopedLock<Mutex> l(lock); - stopped = true; -} - -bool Dispatcher::isStopped() -{ - ScopedLock<Mutex> l(lock); - return stopped; -} - -/** - * Prevent concurrent threads invoking run. - */ -void Dispatcher::startRunning() -{ - ScopedLock<Mutex> l(lock); - if (running) { - throw Exception("Dispatcher is already running."); - } - running = true; + queue->close(); // Will interrupt thread blocked in pop() } -void Dispatcher::stopRunning() +void Dispatcher::setAutoStop(bool b) { ScopedLock<Mutex> l(lock); - running = false; + autoStop = b; } Subscriber::shared_ptr Dispatcher::find(const std::string& name) @@ -129,22 +115,28 @@ Subscriber::shared_ptr Dispatcher::find(const std::string& name) return i->second; } -void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize) +void Dispatcher::listen( + MessageListener* listener, AckPolicy autoAck +) { ScopedLock<Mutex> l(lock); - defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize)); + defaultListener = Subscriber::shared_ptr( + new Subscriber(session, listener, autoAck)); } -void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackBatchSize) +void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck) { ScopedLock<Mutex> l(lock); - listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize)); + listeners[destination] = Subscriber::shared_ptr( + new Subscriber(session, listener, autoAck)); } void Dispatcher::cancel(const std::string& destination) { ScopedLock<Mutex> l(lock); listeners.erase(destination); + if (autoStop && listeners.empty()) + queue->close(); } }} diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h index 550ba36ef5..ae67e61299 100644 --- a/cpp/src/qpid/client/Dispatcher.h +++ b/cpp/src/qpid/client/Dispatcher.h @@ -29,6 +29,7 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "MessageListener.h" +#include "AckPolicy.h" namespace qpid { namespace client { @@ -39,13 +40,11 @@ class Subscriber : public MessageListener { Session_0_10& session; MessageListener* const listener; - const bool autoAck; - const uint ackBatchSize; - uint count; + AckPolicy autoAck; public: typedef boost::shared_ptr<Subscriber> shared_ptr; - Subscriber(Session_0_10& session, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1); + Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy); void received(Message& msg); }; @@ -58,16 +57,14 @@ class Dispatcher : public sys::Runnable sys::Mutex lock; sys::Thread worker; Session_0_10& session; - const std::string queue; + Demux::QueuePtr queue; bool running; - bool stopped; + bool autoStop; Listeners listeners; Subscriber::shared_ptr defaultListener; std::auto_ptr<FrameSetHandler> handler; Subscriber::shared_ptr find(const std::string& name); - void startRunning(); - void stopRunning(); bool isStopped(); public: @@ -76,9 +73,10 @@ public: void start(); void run(); void stop(); + void setAutoStop(bool b); - void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1); - void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1); + void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy()); + void listen(const std::string& destination, MessageListener* listener, AckPolicy autoAck=AckPolicy()); void cancel(const std::string& destination); }; diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp new file mode 100644 index 0000000000..09bf1e055a --- /dev/null +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LocalQueue.h" +#include "qpid/Exception.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace client { + +using namespace framing; + +LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {} +LocalQueue::~LocalQueue() {} + +Message LocalQueue::pop() { + if (!queue) + throw ClosedException(); + FrameSet::shared_ptr content = queue->pop(); + if (content->isA<MessageTransferBody>()) + return Message(*content, session); + else + throw CommandInvalidException( + QPID_MSG("Unexpected method: " << content->getMethod())); +} + +void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; } + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 048b4c7b4d..1c910bd3ee 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -23,8 +23,8 @@ */ #include "qpid/client/Message.h" -#include "qpid/Exception.h" -#include "qpid/sys/BlockingQueue.h" +#include "qpid/client/Demux.h" +#include "qpid/client/AckPolicy.h" namespace qpid { namespace client { @@ -35,16 +35,21 @@ namespace client { class LocalQueue { public: - LocalQueue(BlockingQueue& q) : queue(q) {} + LocalQueue(AckPolicy=AckPolicy()); ~LocalQueue(); /** Pop the next message off the queue. *@exception ClosedException if subscription has been closed. */ - Message pop() { reurn queue->pop(); } + Message pop(); + + void setAckPolicy(AckPolicy); private: - BlockingQueue& queue; + friend class SubscriptionManager; + Session_0_10 session; + Demux::QueuePtr queue; + AckPolicy autoAck; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index b2ae273813..081384974a 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -68,7 +68,14 @@ public: session.execution().completed(id, cumulative, send); } - Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) + void acknowledge(bool cumulative = true, bool send = true) const + { + const_cast<Session_0_10&>(session).execution().completed(id, cumulative, send); + } + + /**@internal for incoming messages */ + Message(const framing::FrameSet& frameset, Session_0_10 s) : + method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()), session(s) { populate(frameset); } @@ -83,10 +90,13 @@ public: return id; } + /**@internal use for incoming messages. */ + void setSession(Session_0_10 s) { session=s; } private: //method and id are only set for received messages: - const framing::MessageTransferBody method; - const framing::SequenceNumber id; + framing::MessageTransferBody method; + framing::SequenceNumber id; + Session_0_10 session; }; }} diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 02ea57fecc..14b9020282 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -116,7 +116,6 @@ void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { SessionCore::~SessionCore() { Lock l(state); - invariant(); detach(COMMAND_INVALID, "Session deleted"); state.waitWaiters(); } diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index bf5191e8a0..fc65843643 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -33,57 +33,54 @@ namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(Session_0_10& s) - : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true) + : dispatcher(s), session(s), + messages(UNLIMITED), bytes(UNLIMITED), window(true) {} -std::string SubscriptionManager::uniqueTag(const std::string& tag) { - // Make unique tag. - int count=1; - std::string unique=tag; - while (subscriptions.find(tag) != subscriptions.end()) { - std::ostringstream s; - s << tag << "-" << count++; - unique=s.str(); - } - subscriptions.insert(unique); - return tag; -} - -std::string SubscriptionManager::subscribe( +void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& t) { - std::string tag=uniqueTag(t); - using namespace arg; + std::string tag=t.empty() ? q:t; session.messageSubscribe(arg::queue=q, arg::destination=tag); - flowLimits(tag, messages, bytes); dispatcher.listen(tag, &listener); - return tag; + setFlowControl(tag, messages, bytes, window); +} + +void SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const std::string& t) +{ + std::string tag=t.empty() ? q:t; + lq.session=session; + lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag)); + session.messageSubscribe(arg::queue=q, arg::destination=tag); + setFlowControl(tag, messages, bytes, window); } -void SubscriptionManager::flowLimits( - const std::string& tag, uint32_t messages, uint32_t bytes) { +void SubscriptionManager::setFlowControl( + const std::string& tag, uint32_t messages, uint32_t bytes, bool window) +{ + session.messageFlowMode(tag, window); session.messageFlow(tag, 0, messages); session.messageFlow(tag, 1, bytes); } -void SubscriptionManager::flowLimits(uint32_t m, uint32_t b) { - messages=m; - bytes=b; +void SubscriptionManager::setFlowControl( + uint32_t messages_, uint32_t bytes_, bool window_) +{ + messages=messages_; + bytes=bytes_; + window=window_; } void SubscriptionManager::cancel(const std::string tag) { - if (subscriptions.erase(tag)) { - dispatcher.cancel(tag); - session.messageCancel(tag); - if (autoStop && subscriptions.empty()) stop(); - } + dispatcher.cancel(tag); + session.messageCancel(tag); } -void SubscriptionManager::run(bool autoStop_) +void SubscriptionManager::run(bool autoStop) { - autoStop=autoStop_; - if (autoStop && subscriptions.empty()) return; + dispatcher.setAutoStop(autoStop); dispatcher.run(); } diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 985b6ce222..1a03c1a47b 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -21,55 +21,59 @@ * under the License. * */ - +#include "qpid/sys/Mutex.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Session_0_10.h> #include <qpid/client/MessageListener.h> +#include <qpid/client/LocalQueue.h> #include <set> #include <sstream> namespace qpid { namespace client { -struct TagNotUniqueException : public qpid::Exception { - TagNotUniqueException() {} -}; - class SubscriptionManager { - std::set<std::string> subscriptions; + typedef sys::Mutex::ScopedLock Lock; + typedef sys::Mutex::ScopedUnlock Unlock; + qpid::client::Dispatcher dispatcher; qpid::client::Session_0_10& session; - std::string uniqueTag(const std::string&); uint32_t messages; uint32_t bytes; - bool autoStop; + bool window; public: SubscriptionManager(Session_0_10& session); /** - * Subscribe listener to receive messages from queue. + * Subscribe a MessagesListener to receive messages from queue. + * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. *@param tag Unique destination tag for the listener. - * If not specified a unique tag will be generted based on the queue name. - *@return Destination tag. - *@exception TagNotUniqueException if there is already a subscription - * with the same tag. + * If not specified, the queue name is used. */ - std::string subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + void subscribe(MessageListener& listener, + const std::string& queue, + const std::string& tag=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + *@param queue Name of the queue to subscribe to. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& tag=std::string()); /** Cancel a subscription. */ void cancel(const std::string tag); - - qpid::client::Dispatcher& getDispatcher() { return dispatcher; } - size_t size() { return subscriptions.size(); } /** Deliver messages until stop() is called. - *@param autoStop If true, return when all subscriptions are cancelled. + *@param autoStop If true, return when all listeners are cancelled. */ void run(bool autoStop=true); @@ -78,13 +82,20 @@ public: static const uint32_t UNLIMITED=0xFFFFFFFF; - /** Set the flow control limits for subscriber with tag. - * UNLIMITED means no limit. + /** Set the flow control for destination tag. + *@param tag: name of the destination. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. */ - void flowLimits(const std::string& tag, uint32_t messages, uint32_t bytes); + void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true); - /** Set the initial flow control limits for new subscribers */ - void flowLimits(uint32_t messages, uint32_t bytes); + /** Set the initial flow control settings to be applied to each new subscribtion. + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. + */ + void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); }; diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index c53949ad6f..44c95b225d 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -32,7 +32,7 @@ namespace sys { template <class T> class BlockingQueue { - sys::Waitable lock; + mutable sys::Waitable lock; std::queue<T> queue; bool closed; @@ -95,6 +95,11 @@ public: closed=false; } + bool isClosed() const { + Waitable::ScopedLock l(lock); + return closed; + } + private: void queueNotify(size_t ignore) { diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index ed3d733c20..369477131c 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -48,7 +48,7 @@ struct DummyListener : public MessageListener void listen() { - dispatcher.listen(name, this, true, 1); + dispatcher.listen(name, this); dispatcher.run(); } diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index bc816f6597..80157da7f4 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,44 +21,48 @@ #include "TestOptions.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Exchange.h" -#include "qpid/client/Queue.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" -#include "qpid/client/MessageListener.h" #include "qpid/client/Message.h" -#include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <iostream> -#include <cstdlib> -#include <iomanip> -#include <time.h> -#include <unistd.h> +#include <sstream> - -using namespace qpid; -using namespace qpid::client; -using namespace qpid::sys; using namespace std; +using namespace qpid; +using namespace client; +using namespace sys; struct Opts : public TestOptions { bool listen; bool publish; int count; - bool durable; + int size; + bool durable; + int consumers; + std::string mode; - Opts() : listen(false), publish(false), count(500000) { + Opts() : + listen(false), publish(false), count(500000), size(64), consumers(1), + mode("shared") + { addOptions() ("listen", optValue(listen), "Consume messages.") ("publish", optValue(publish), "Produce messages.") - ("count", optValue(count, "N"), "Messages to send/receive.") - ("durable", optValue(durable, "N"), "Publish messages as durable."); + ("count", optValue(count, "N"), "Messages to send.") + ("size", optValue(size, "BYTES"), "Size of messages.") + ("durable", optValue(durable, "N"), "Publish messages as durable.") + ("consumers", optValue(consumers, "N"), "Number of consumers.") + ("mode", optValue(mode, "shared|fanout|topic"), "consume mode"); } }; Opts opts; +enum Mode { SHARED, FANOUT, TOPIC }; +Mode mode; struct ListenThread : public Runnable { Thread thread; void run(); }; struct PublishThread : public Runnable { Thread thread; void run(); }; @@ -66,16 +70,22 @@ struct PublishThread : public Runnable { Thread thread; void run(); }; int main(int argc, char** argv) { try { opts.parse(argc, argv); + if (opts.mode=="shared") mode=SHARED; + else if (opts.mode=="fanout") mode = FANOUT; + else if (opts.mode=="topic") mode = TOPIC; + else throw Exception("Invalid mode"); if (!opts.listen && !opts.publish) opts.listen = opts.publish = true; - ListenThread listen; + std::vector<ListenThread> listen(opts.consumers); PublishThread publish; - if (opts.listen) - listen.thread=Thread(listen); + if (opts.listen) + for (int i = 0; i < opts.consumers; ++i) + listen[i].thread=Thread(listen[i]); if (opts.publish) publish.thread=Thread(publish); if (opts.listen) - listen.thread.join(); + for (int i = 0; i < opts.consumers; ++i) + listen[i].thread.join(); if (opts.publish) publish.thread.join(); } @@ -84,223 +94,149 @@ int main(int argc, char** argv) { } } -// ================================================================ -// Publish client -// +double secs(Duration d) { return double(d)/TIME_SEC; } +double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); } -struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { - timespec r; - r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; - r.tv_sec = lhs.tv_sec - rhs.tv_sec; - if (r.tv_nsec < 0) { - r.tv_nsec += 1000000000; - r.tv_sec -= 1; - } - return r; -} -ostream& operator<<(ostream& o, const struct timespec& ts) { - o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec; - return o; -} +void expect(string actual, string expect) { + if (expect != actual) + throw Exception("Expecting "+expect+" but received "+actual); -double toDouble(const struct timespec& ts) { - return double(ts.tv_nsec)/1000000000 + ts.tv_sec; } -class PublishListener : public MessageListener { - - void set_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - startTime = ts; - } - - void print_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - cout << "Total Time:" << ts-startTime << endl; - double rate = messageCount*2/toDouble(ts-startTime); - cout << "returned Messages:" << messageCount << endl; - cout << "round trip Rate:" << rate << endl; - } - - struct timespec startTime; - int messageCount; - bool done; - Monitor lock; - - public: - - PublishListener(int mcount): messageCount(mcount), done(false) { - set_time(); - } - - void received(Message& msg) { - print_time(); - QPID_LOG(info, "Publisher: received: " << msg.getData()); - Mutex::ScopedLock l(lock); - QPID_LOG(info, "Publisher: done."); - done = true; - lock.notify(); +const char* exchange() { + switch (mode) { + case SHARED: return ""; // Deafult exchange. + case FANOUT: return "amq.fanout"; + case TOPIC: return "amq.topic"; } - - void wait() { - Mutex::ScopedLock l(lock); - while (!done) - lock.wait(); - } -}; - + assert(0); + return 0; +} void PublishThread::run() { - Connection connection; - Channel channel; - Message msg; - opts.open(connection); - connection.openChannel(channel); - channel.start(); - - cout << "Started publisher." << endl; - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + try { + Connection connection; + opts.open(connection); + Session_0_10 session = connection.newSession(); + + session.queueDeclare(arg::queue="control"); // Control queue + session.queuePurge(arg::queue="control"); + if (mode==SHARED) { + session.queueDeclare(arg::queue="perftest"); // Shared data queue + session.queuePurge(arg::queue="perftest"); + } - string queueName ="queue01"; - string queueNameC =queueName+"-1"; - - // create publish queue - Queue publish(queueName); - channel.declareQueue(publish); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - // pass queue name - msg.setData(queueName); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); - - QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC); - - int count = opts.count; - PublishListener listener(count); - channel.consume(completion, queueNameC, &listener); - QPID_LOG(info, "Publisher setup consumer: "<< queueNameC); - - struct timespec startTime; - if (::clock_gettime(CLOCK_REALTIME, &startTime)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - - bool durable = opts.durable; - if (durable) + // Wait for consumers. + SubscriptionManager subs(session); + LocalQueue control; + subs.subscribe(control, "control"); + for (int i = 0; i < opts.consumers; ++i) + expect(control.pop().getData(), "ready"); + + // Create test message + size_t msgSize=max(opts.size, 32); + Message msg(string(msgSize, 'X'), "perftest"); + char* msgBuf = const_cast<char*>(msg.getData().data()); + if (opts.durable) msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + // Time sending message. + AbsTime start=now(); + cout << "Publishing " << opts.count << " messages " << flush; + for (int i=0; i<opts.count; i++) { + sprintf(msgBuf, "%d", i); + session.messageTransfer(arg::destination=exchange(), + arg::content=msg); + if ((i%10000)==0) cout << "." << flush; + } + cout << " done." << endl; + msg.setData("done"); // Send done messages. + if (mode==SHARED) + for (int i = 0; i < opts.consumers; ++i) + session.messageTransfer(arg::destination=exchange(), arg::content=msg); + else + session.messageTransfer(arg::destination=exchange(), arg::content=msg); + AbsTime end=now(); + + // Report + cout << endl; + cout << "publish count:" << opts.count << endl; + cout << "publish secs:" << secs(start,end) << endl; + cout << "publish rate:" << (opts.count)/secs(start,end) << endl; + + // Wait for consumer(s) to finish. + for (int i = 0; i < opts.consumers; ++i) { + string report=control.pop().getData(); + if (report.find("consume") != 0) + throw Exception("Expected consumer report, got: "+report); + cout << endl << report; + } + end=now(); + + // Count total transfers from publisher and to subscribers. + int transfers; + if (mode==SHARED) // each message sent/receivd once. + transfers=2*opts.count; + else // sent once, received N times. + transfers=opts.count*(opts.consumers + 1); + + cout << endl + << "total transfers:" << transfers << endl + << "total secs:" << secs(start, end) << endl + << "total transfers/sec:" << transfers/secs(start, end) << endl; - for (int i=0; i<count; i++) { - msg.setData("Message 0123456789 "); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + connection.close(); + } + catch (const std::exception& e) { + cout << "PublishThread exception: " << e.what() << endl; } - - struct timespec endTime; - if (::clock_gettime(CLOCK_REALTIME, &endTime)) - throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); - - cout << "publish Time:" << endTime-startTime << endl; - double rate = count/toDouble(endTime-startTime); - cout << "publish Messages:" << count << endl; - cout << "publish Rate:" << rate << endl; - - msg.setData(queueName); // last message to queue. - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); - - listener.wait(); - - channel.close(); - connection.close(); } - - -// ================================================================ -// Listen client -// - -class Listener : public MessageListener{ - string queueName; - Monitor lock; - bool done; - - public: - Listener (string& _queueName): queueName(_queueName), done(false) {}; - - void received(Message& msg) { - if (msg.getData() == queueName) - { - Mutex::ScopedLock l(lock); - QPID_LOG(info, "Listener: done. " << queueName); - done = true; - lock.notify(); +void ListenThread::run() { + try { + Connection connection; + opts.open(connection); + Session_0_10 session = connection.newSession(); + + string consumeQueue; + switch (mode) { + case SHARED: + consumeQueue="perftest"; + session.queueDeclare(arg::queue="perftest"); + break; + case FANOUT: + case TOPIC: + consumeQueue=session.getId().str(); // Unique + session.queueDeclare(arg::queue=consumeQueue, + arg::exclusive=true, + arg::autoDelete=true); + session.queueBind(arg::queue=consumeQueue, + arg::exchange=exchange(), + arg::routingKey="perftest"); } + // Notify publisher we are ready. + session.queueDeclare(arg::queue="control"); // Control queue + session.messageTransfer(arg::content=Message("ready", "control")); + + SubscriptionManager subs(session); + LocalQueue consume; + subs.subscribe(consume, consumeQueue); + int consumed=0; + AbsTime start=now(); + while (consume.pop().getData() != "done") + ++consumed; + AbsTime end=now(); + + // Report to publisher. + ostringstream report; + report << "consume count: " << consumed << endl + << "consume secs: " << secs(start, end) << endl + << "consume rate: " << consumed/secs(start,end) << endl; + session.messageTransfer(arg::content=Message(report.str(), "control")); + connection.close(); } - - void wait() { - Mutex::ScopedLock l(lock); - while (!done) - lock.wait(); - } -}; - -void ListenThread::run() { - Connection connection; - Channel channel; - Message msg; - Message msg1; - cout << "Started listener." << endl;; - opts.open(connection); - connection.openChannel(channel); - channel.start(); - - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); - while (!channel.get(msg, response, AUTO_ACK)) { - QPID_LOG(info, "Listener: waiting for queue name."); - sleep(1); + catch (const std::exception& e) { + cout << "PublishThread exception: " << e.what() << endl; } - string queueName =msg.getData(); - string queueNameC =queueName+ "-1"; - - QPID_LOG(info, "Listener: Using Queue:" << queueName); - QPID_LOG(info, "Listener: Reply Queue:" << queueNameC); - // create consume queue - Queue consume(queueName); - channel.declareQueue(consume); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName); - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - Listener listener(queueName); - channel.consume(consume, queueName, &listener); - QPID_LOG(info, "Listener: consuming..."); - - listener.wait(); - - QPID_LOG(info, "Listener: send final message."); - // complete. - msg1.setData(queueName); - channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC); - - channel.close(); - connection.close(); } - |