diff options
Diffstat (limited to 'qpid/cpp/src/tests/echo_service.cpp')
-rw-r--r-- | qpid/cpp/src/tests/echo_service.cpp | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/echo_service.cpp b/qpid/cpp/src/tests/echo_service.cpp new file mode 100644 index 0000000000..c3569d5fd4 --- /dev/null +++ b/qpid/cpp/src/tests/echo_service.cpp @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This class provides an example of using AMQP for a request-response + * style system. 'Requests' are messages sent to a well known + * destination. A 'service' process consumes these message and + * responds by echoing the message back to the sender on a + * sender-specified private queue. + */ + +#include "qpid/client/Channel.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Exchange.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Queue.h" +#include "qpid/sys/Time.h" +#include <iostream> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + + +/** + * A message listener implementation representing the 'service', this + * will 'echo' any requests received. + */ +class EchoServer : public MessageListener{ + Channel* const channel; +public: + EchoServer(Channel* channel); + virtual void received(Message& msg); +}; + +/** + * A message listener implementation that merely prints received + * messages to the console. Used to report on 'echo' responses. + */ +class LoggingListener : public MessageListener{ +public: + virtual void received(Message& msg); +}; + +/** + * A utility class that manages the command line options needed to run + * the example confirgurably. + */ +class Args{ + string host; + int port; + bool trace; + bool help; + bool client; +public: + inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } + inline bool getClient() const { return client; } +}; + +/** + * The main test path. There are two basic modes: 'client' and + * 'service'. First one or more services are started, then one or more + * clients are started and messages can be sent. + */ +int main(int argc, char** argv){ + const std::string echo_service("echo_service"); + Args args; + args.parse(argc, argv); + if (args.getHelp()) { + args.usage(); + } else if (args.getClient()) { + //we have been started in 'client' mode, i.e. we will send an + //echo requests and print responses received. + try { + //Create connection & open a channel + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel; + connection.openChannel(channel); + + //Setup: declare the private 'response' queue and bind it + //to the direct exchange by its name which will be + //generated by the server + Queue response; + channel.declareQueue(response); + qpid::framing::FieldTable emptyArgs; + channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); + + //Consume from the response queue, logging all echoed message to console: + LoggingListener listener; + std::string tag; + channel.consume(response, tag, &listener); + + //Process incoming requests on a new thread + channel.start(); + + //get messages from console and send them: + std::string text; + std::cout << "Enter text to send:" << std::endl; + while (std::getline(std::cin, text)) { + std::cout << "Sending " << text << " to echo server." << std::endl; + Message msg; + msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); + msg.setData(text); + channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); + + std::cout << "Enter text to send:" << std::endl; + } + + connection.close(); + } catch(std::exception& error) { + std::cout << error.what() << std::endl; + } + } else { + // we are in 'service' mode, i.e. we will consume messages + // from the request queue and echo each request back to the + // senders own private response queue. + try { + //Create connection & open a channel + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel; + connection.openChannel(channel); + + //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name + Queue request("request"); + channel.declareQueue(request); + qpid::framing::FieldTable emptyArgs; + channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs); + + //Consume from the request queue, echoing back all messages received to the client that sent them + EchoServer server(&channel); + std::string tag = "server_tag"; + channel.consume(request, tag, &server); + + //Process incoming requests on the main thread + channel.run(); + + connection.close(); + } catch(std::exception& error) { + std::cout << error.what() << std::endl; + } + } +} + +EchoServer::EchoServer(Channel* _channel) : channel(_channel){} + +void EchoServer::received(Message& message) +{ + //get name of response queues binding to the default direct exchange: + const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); + + if (name.empty()) { + std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; + } else { + //print message to console: + std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; + + //'echo' the message back: + channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); + } +} + +void LoggingListener::received(Message& message) +{ + //print message to console: + std::cout << "Received echo: " << message.getData() << std::endl; +} + + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else if("-client" == name){ + client = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; + std::cout << " -client" << std::endl; + std::cout << " Run as a client (else will run as a server)" << std::endl; +} |