diff options
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 71 | ||||
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 59 |
5 files changed, 95 insertions, 70 deletions
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 37fada45fb..7ffcd676a3 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -69,18 +69,22 @@ void Dispatcher::run() boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; queue->open(); - while (!queue->isClosed()) { - Mutex::ScopedUnlock u(lock); - FrameSet::shared_ptr content = queue->pop(); - if (content->isA<MessageTransferBody>()) { - Message msg(*content, session); - Subscriber::shared_ptr listener = find(msg.getDestination()); - assert(listener); - listener->received(msg); - } else { - assert (handler.get()); - handler->handle(*content); + try { + while (!queue->isClosed()) { + Mutex::ScopedUnlock u(lock); + FrameSet::shared_ptr content = queue->pop(); + if (content->isA<MessageTransferBody>()) { + Message msg(*content, session); + Subscriber::shared_ptr listener = find(msg.getDestination()); + assert(listener); + listener->received(msg); + } else { + assert (handler.get()); + handler->handle(*content); + } } + } catch (const ClosedException&) { + //ignore it and return } } diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index a758dc1341..438a73ec6f 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -102,6 +102,11 @@ void SubscriptionManager::stop() dispatcher.stop(); } +void SubscriptionManager::start() +{ + dispatcher.start(); +} + }} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index ad5ddfa000..fb1a726bff 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -82,9 +82,13 @@ public: */ void run(bool autoStop=true); + /** Deliver messages in another thread. */ + void start(); + /** Cause run() to return */ void stop(); + static const uint32_t UNLIMITED=0xFFFFFFFF; /** Set the flow control for destination tag. diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 5aef16354e..fca50e3941 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -33,11 +33,10 @@ */ #include "TestOptions.h" -#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/Queue.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Time.h" #include "qpid/framing/FieldValue.h" #include <iostream> @@ -54,7 +53,8 @@ using namespace std; * defined. */ class Listener : public MessageListener{ - Channel* const channel; + Session_0_10& session; + SubscriptionManager& mgr; const string responseQueue; const bool transactional; bool init; @@ -64,7 +64,7 @@ class Listener : public MessageListener{ void shutdown(); void report(); public: - Listener(Channel* channel, const string& reponseQueue, bool tx); + Listener(Session_0_10& session, SubscriptionManager& mgr, const string& reponseQueue, bool tx); virtual void received(Message& msg); }; @@ -72,14 +72,14 @@ public: * A utility class for managing the options passed in. */ struct Args : public qpid::TestOptions { - int ackmode; + int ack; bool transactional; int prefetch; - Args() : ackmode(NO_ACK), transactional(false), prefetch(1000) { + Args() : ack(0), transactional(false), prefetch(0) { addOptions() - ("ack", optValue(ackmode, "MODE"), "Ack mode: 0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK") + ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)") ("transactional", optValue(transactional), "Use transactions") - ("prefetch", optValue(prefetch, "N"), "prefetch count"); + ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)"); } }; @@ -98,24 +98,35 @@ int main(int argc, char** argv){ else { cout << "topic_listener: Started." << endl; Connection connection(args.trace); - connection.open(args.host, args.port, args.username, args.password, args.virtualhost); - Channel channel(args.transactional, args.prefetch); - connection.openChannel(channel); - + args.open(connection); + Session_0_10 session = connection.newSession(); + if (args.transactional) { + session.txSelect(); + } + //declare exchange, queue and bind them: - Queue response("response"); - channel.declareQueue(response); - - Queue control; - channel.declareQueue(control); - qpid::framing::FieldTable bindArgs; - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs); + session.queueDeclare(arg::queue="response"); + std::string control = "control_" + session.getId().str(); + session.queueDeclare(arg::queue=control); + session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control"); + //set up listener - Listener listener(&channel, response.getName(), args.transactional); - channel.consume(control, "c1", &listener, AckMode(args.ackmode)); + SubscriptionManager mgr(session); + Listener listener(session, mgr, "response", args.transactional); + if (args.prefetch) { + mgr.setAckPolicy(AckPolicy(args.ack ? args.ack : (args.prefetch / 2))); + mgr.setFlowControl(args.prefetch, SubscriptionManager::UNLIMITED, true); + } else { + mgr.setConfirmMode(false); + mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); + } + mgr.subscribe(listener, control); + cout << "topic_listener: Consuming." << endl; - channel.run(); - cout << "topic_listener: run returned, closing connection" << endl; + mgr.run(); + cout << "topic_listener: run returned, closing session" << endl; + session.close(); + cout << "closing connection" << endl; connection.close(); cout << "topic_listener: normal exit" << endl; } @@ -126,8 +137,8 @@ int main(int argc, char** argv){ return 1; } -Listener::Listener(Channel* _channel, const string& _responseq, bool tx) : - channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){} +Listener::Listener(Session_0_10& s, SubscriptionManager& m, const string& _responseq, bool tx) : + session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} void Listener::received(Message& message){ if(!init){ @@ -149,7 +160,7 @@ void Listener::received(Message& message){ } void Listener::shutdown(){ - channel->close(); + mgr.stop(); } void Listener::report(){ @@ -158,11 +169,11 @@ void Listener::report(){ stringstream reportstr; reportstr << "Received " << count << " messages in " << time/TIME_MSEC << " ms."; - Message msg(reportstr.str()); + Message msg(reportstr.str(), responseQueue); msg.getHeaders().setString("TYPE", "REPORT"); - channel->publish(msg, string(), responseQueue); + session.messageTransfer(arg::content=msg); if(transactional){ - channel->commit(); + session.txCommit(); } } diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 1c5b51309b..1354dc1435 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -35,11 +35,10 @@ */ #include "TestOptions.h" -#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/Queue.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include <unistd.h> #include "qpid/sys/Time.h" @@ -57,7 +56,7 @@ using namespace std; * back by the subscribers. */ class Publisher : public MessageListener{ - Channel* const channel; + Session_0_10& session; const string controlTopic; const bool transactional; Monitor monitor; @@ -67,7 +66,7 @@ class Publisher : public MessageListener{ string generateData(int size); public: - Publisher(Channel* channel, const string& controlTopic, bool tx); + Publisher(Session_0_10& session, const string& controlTopic, bool tx); virtual void received(Message& msg); int64_t publish(int msgs, int listeners, int size); void terminate(); @@ -79,7 +78,7 @@ public: struct Args : public TestOptions { int messages; int subscribers; - int ackmode; + int ack; bool transactional; int prefetch; int batches; @@ -87,13 +86,13 @@ struct Args : public TestOptions { int size; Args() : messages(1000), subscribers(1), - ackmode(NO_ACK), transactional(false), prefetch(1000), + ack(500), transactional(false), prefetch(1000), batches(1), delay(0), size(256) { addOptions() ("messages", optValue(messages, "N"), "how many messages to send") ("subscribers", optValue(subscribers, "N"), "how many subscribers to expect reports from") - ("ackmode", optValue(ackmode, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK") + ("ack", optValue(ack, "MODE"), "Acknowledgement mode:0=NO_ACK, 1=AUTO_ACK, 2=LAZY_ACK") ("transactional", optValue(transactional), "client should use transactions") ("prefetch", optValue(prefetch, "N"), "prefetch count") ("batches", optValue(batches, "N"), "how many batches to run") @@ -110,18 +109,21 @@ int main(int argc, char** argv) { cout << args << endl; else { Connection connection(args.trace); - connection.open(args.host, args.port, args.username, args.password, args.virtualhost); - Channel channel(args.transactional, args.prefetch); - connection.openChannel(channel); + args.open(connection); + Session_0_10 session = connection.newSession(); + if (args.transactional) { + session.txSelect(); + } + //declare queue (relying on default binding): - Queue response("response"); - channel.declareQueue(response); + session.queueDeclare(arg::queue="response"); //set up listener - Publisher publisher(&channel, "topic_control", args.transactional); - channel.consume(response, "mytag", &publisher, AckMode(args.ackmode)); - channel.start(); + SubscriptionManager mgr(session); + Publisher publisher(session, "topic_control", args.transactional); + mgr.subscribe(publisher, "response"); + mgr.start(); int batchSize(args.batches); int64_t max(0); @@ -140,12 +142,13 @@ int main(int argc, char** argv) { << " in " << msecs << "ms" << endl; } publisher.terminate(); + mgr.stop(); int64_t avg = sum / batchSize; if(batchSize > 1){ cout << batchSize << " batches completed. avg=" << avg << ", max=" << max << ", min=" << min << endl; } - channel.close(); + session.close(); connection.close(); } return 0; @@ -155,8 +158,8 @@ int main(int argc, char** argv) { return 1; } -Publisher::Publisher(Channel* _channel, const string& _controlTopic, bool tx) : - channel(_channel), controlTopic(_controlTopic), transactional(tx){} +Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx) : + session(_session), controlTopic(_controlTopic), transactional(tx){} void Publisher::received(Message& ){ //count responses and when all are received end the current batch @@ -172,21 +175,19 @@ void Publisher::waitForCompletion(int msgs){ } int64_t Publisher::publish(int msgs, int listeners, int size){ - Message msg; - msg.setData(generateData(size)); + Message msg(generateData(size), controlTopic); AbsTime start = now(); { Monitor::ScopedLock l(monitor); for(int i = 0; i < msgs; i++){ - channel->publish( - msg, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + session.messageTransfer(arg::content=msg, arg::destination="amq.topic"); } //send report request - Message reportRequest; + Message reportRequest("", controlTopic); reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - channel->publish(reportRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic"); if(transactional){ - channel->commit(); + session.txCommit(); } waitForCompletion(listeners); @@ -206,11 +207,11 @@ string Publisher::generateData(int size){ void Publisher::terminate(){ //send termination request - Message terminationRequest; + Message terminationRequest("", controlTopic); terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - channel->publish(terminationRequest, Exchange::STANDARD_TOPIC_EXCHANGE, controlTopic); + session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic"); if(transactional){ - channel->commit(); + session.txCommit(); } } |