diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/client_test.cpp | 157 |
1 files changed, 83 insertions, 74 deletions
diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index e4fd57824c..eb145272ca 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -29,106 +29,115 @@ #include <iostream> #include "TestOptions.h" -#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Time.h" +#include "qpid/client/Session_0_10.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageTransferBody.h" +using namespace qpid; using namespace qpid::client; -using namespace qpid::sys; +using qpid::framing::FrameSet; +using qpid::framing::MessageTransferBody; using std::string; +struct Args : public qpid::TestOptions { + uint msgSize; + uint maxFrameSize; -/** - * A simple message listener implementation that prints out the - * message content then notifies a montitor allowing the test to - * complete. - */ -class SimpleListener : public virtual MessageListener{ - Monitor* monitor; - bool verbose; + Args() : msgSize(26), maxFrameSize(65535) + { + addOptions() + ("size", optValue(msgSize, "N"), "message size") + ("max-frame-size", optValue(maxFrameSize, "N"), "max frame size"); + } +}; -public: - inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {} +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); - inline virtual void received(Message& msg){ - if (verbose) - std::cout << "Received message " << msg.getData() << std::endl; - monitor->notify(); +std::string generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; } -}; + data += chars.substr(0, size % chars.length()); + return data; +} + +void print(const std::string& text, const Message& msg) +{ + std::cout << text; + if (msg.getData().size() > 16) { + std::cout << msg.getData().substr(0, 16) << "..."; + } else { + std::cout << msg.getData(); + } + std::cout << std::endl; +} int main(int argc, char** argv) { try { - qpid::TestOptions opts; + Args opts; opts.parse(argc, argv); - - //Use a custom exchange - Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE); - //Use a named, temporary queue - Queue queue("MyQueue", true); - - - Connection con(opts.trace); - con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost); - if (opts.trace) - std::cout << "Opened connection." << std::endl; - - //Create and open a channel on the connection through which - //most functionality is exposed - Channel channel; - con.openChannel(channel); - if (opts.trace) std::cout << "Opened channel." << std::endl; + + //Connect to the broker: + Connection connection(opts.trace, opts.maxFrameSize); + opts.open(connection); + if (opts.trace) std::cout << "Opened connection." << std::endl; + + //Create and open a session on the connection through which + //most functionality is exposed: + Session_0_10 session = connection.newSession(); + if (opts.trace) std::cout << "Opened session." << std::endl; + //'declare' the exchange and the queue, which will create them //as they don't exist - channel.declareExchange(exchange); + session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct"); if (opts.trace) std::cout << "Declared exchange." << std::endl; - channel.declareQueue(queue); + session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true); if (opts.trace) std::cout << "Declared queue." << std::endl; //now bind the queue to the exchange - channel.bind(exchange, queue, "MyTopic"); + session.queueBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::routingKey="MyKey"); if (opts.trace) std::cout << "Bound queue to exchange." << std::endl; - //Set up a message listener to receive any messages that - //arrive in our queue on the broker. We only expect one, and - //as it will be received on another thread, we create a - //montior to use to notify the main thread when that message - //is received. - Monitor monitor; - SimpleListener listener(&monitor, opts.trace); - string tag("MyTag"); - channel.consume(queue, tag, &listener); - if (opts.trace) std::cout << "Registered consumer." << std::endl; - - //we need to enable the message dispatching for this channel - //and we want that to occur on another thread so we call - //start(). - channel.start(); - - //Now we create and publish a message to our exchange with a - //routing key that will cause it to be routed to our queue - Message msg; - string data("MyMessage"); - msg.setData(data); - channel.publish(msg, exchange, "MyTopic"); - if (opts.trace) std::cout << "Published message: " << data << std::endl; - - { - Monitor::ScopedLock l(monitor); - //now we wait until we receive notification that the - //message was received - monitor.wait(); + //create and send a message to the exchange using the routing + //key we bound our queue with: + Message msgOut(generateData(opts.msgSize)); + msgOut.getDeliveryProperties().setRoutingKey("MyKey"); + session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut); + if (opts.trace) print("Published message: ", msgOut); + + //subscribe to the queue, add sufficient credit and then get + //incoming 'frameset', check that its a message transfer and + //then convert it to a message (see Dispatcher and + //SubscriptionManager utilties for common reusable patterns at + //a higher level) + session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId"); + session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message + session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes + if (opts.trace) std::cout << "Subscribed to queue." << std::endl; + FrameSet::shared_ptr incoming = session.get(); + if (incoming->isA<MessageTransferBody>()) { + Message msgIn(*incoming, session); + if (msgIn.getData() == msgOut.getData()) { + if (opts.trace) std::cout << "Received the exepected message." << std::endl; + msgIn.acknowledge(); + } else { + print("Received an unexepected message: ", msgIn); + } } - //close the channel & connection - channel.close(); - if (opts.trace) std::cout << "Closed channel." << std::endl; - con.close(); + //close the session & connection + session.close(); + if (opts.trace) std::cout << "Closed session." << std::endl; + connection.close(); if (opts.trace) std::cout << "Closed connection." << std::endl; return 0; } catch(const std::exception& e) { |