diff options
author | Gordon Sim <gsim@apache.org> | 2006-11-21 11:49:14 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-11-21 11:49:14 +0000 |
commit | 1e158fcf8f50975967aaa63f86aa9de18526425e (patch) | |
tree | 67ceb06bef84ec5c7f0434ca139e1f4ea2737464 /cpp | |
parent | 605362a77aea68fb728bef528130879152dfb730 (diff) | |
download | qpid-python-1e158fcf8f50975967aaa63f86aa9de18526425e.tar.gz |
Added another example for client api.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@477636 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h | 1 | ||||
-rw-r--r-- | cpp/test/client/echo_service.cpp | 197 |
3 files changed, 207 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index e00652361c..86fbdc062c 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -98,6 +98,12 @@ void Connector::writeToSocket(char* data, size_t available){ } } +void Connector::handleClosed(){ + closed = true; + socket.close(); + if(shutdownHandler) shutdownHandler->shutdown(); +} + void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ int64_t now = Time::now().msecs(); @@ -106,9 +112,7 @@ void Connector::checkIdle(ssize_t status){ timeoutHandler->idleIn(); } }else if(status == Socket::SOCKET_EOF){ - closed = true; - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); + handleClosed(); }else{ lastIn = now; } @@ -153,7 +157,7 @@ void Connector::run(){ ssize_t received = socket.recv(inbuf.start(), available); checkIdle(received); - if(received > 0){ + if(!closed && received > 0){ inbuf.move(received); inbuf.flip();//position = 0, limit = total data read @@ -168,5 +172,6 @@ void Connector::run(){ } }catch(QpidError error){ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + handleClosed(); } } diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index b8a37d3b22..08705a4eb3 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -70,6 +70,7 @@ namespace client { void setSocketTimeout(); void run(); + void handleClosed(); public: Connector(bool debug = false, u_int32_t buffer_size = 1024); diff --git a/cpp/test/client/echo_service.cpp b/cpp/test/client/echo_service.cpp new file mode 100644 index 0000000000..9d10610c83 --- /dev/null +++ b/cpp/test/client/echo_service.cpp @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/QpidError.h> +#include <qpid/client/Channel.h> +#include <qpid/client/Connection.h> +#include <qpid/client/Exchange.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/Queue.h> +#include <qpid/sys/Time.h> +#include <iostream> +#include <sstream> + +using namespace qpid::client; +using namespace qpid::sys; + +class EchoServer : public MessageListener{ + Channel* const channel; +public: + EchoServer(Channel* channel); + virtual void received(Message& msg); +}; + +class LoggingListener : public MessageListener{ +public: + virtual void received(Message& msg); +}; + +class Args{ + string host; + int port; + bool trace; + bool help; + bool client; +public: + inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} + void parse(int argc, char** argv); + void usage(); + + inline const string& getHost() const { return host;} + inline int getPort() const { return port; } + inline bool getTrace() const { return trace; } + inline bool getHelp() const { return help; } + inline bool getClient() const { return client; } +}; + +int main(int argc, char** argv){ + const std::string echo_service("echo_service"); + Args args; + args.parse(argc, argv); + if (args.getHelp()) { + args.usage(); + } else if (args.getClient()) { + try { + //Create connection & open a channel + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel; + connection.openChannel(&channel); + + //Setup: declare the private 'response' queue and bind it + //to the direct exchange by its name which will be + //generated by the server + Queue response; + channel.declareQueue(response); + qpid::framing::FieldTable emptyArgs; + channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); + + //Consume from the response queue, logging all echoed message to console: + LoggingListener listener; + std::string tag; + channel.consume(response, tag, &listener); + + //Process incoming requests on a new thread + channel.start(); + + //get messages from console and send them: + std::string text; + std::cout << "Enter text to send:" << std::endl; + while (std::getline(std::cin, text)) { + std::cout << "Sending " << text << " to echo server." << std::endl; + Message msg; + msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); + msg.setData(text); + channel.publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, echo_service); + + std::cout << "Enter text to send:" << std::endl; + } + + connection.close(); + } catch(qpid::QpidError error) { + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } else { + try { + //Create connection & open a channel + Connection connection(args.getTrace()); + connection.open(args.getHost(), args.getPort()); + Channel channel; + connection.openChannel(&channel); + + //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name + Queue request("request"); + channel.declareQueue(request); + qpid::framing::FieldTable emptyArgs; + channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, request, echo_service, emptyArgs); + + //Consume from the request queue, echoing back all messages received to the client that sent them + EchoServer server(&channel); + std::string tag = "server_tag"; + channel.consume(request, tag, &server); + + //Process incoming requests on the main thread + channel.run(); + + connection.close(); + } catch(qpid::QpidError error) { + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + } +} + +EchoServer::EchoServer(Channel* _channel) : channel(_channel){} + +void EchoServer::received(Message& message) +{ + //get name of response queues binding to the default direct exchange: + const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); + + if (name.empty()) { + std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; + } else { + //print message to console: + std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; + + //'echo' the message back: + channel->publish(message, Exchange::DEFAULT_DIRECT_EXCHANGE, name); + } +} + +void LoggingListener::received(Message& message) +{ + //print message to console: + std::cout << "Received echo: " << message.getData() << std::endl; +} + + +void Args::parse(int argc, char** argv){ + for(int i = 1; i < argc; i++){ + string name(argv[i]); + if("-help" == name){ + help = true; + break; + }else if("-host" == name){ + host = argv[++i]; + }else if("-port" == name){ + port = atoi(argv[++i]); + }else if("-trace" == name){ + trace = true; + }else if("-client" == name){ + client = true; + }else{ + std::cout << "Warning: unrecognised option " << name << std::endl; + } + } +} + +void Args::usage(){ + std::cout << "Options:" << std::endl; + std::cout << " -help" << std::endl; + std::cout << " Prints this usage message" << std::endl; + std::cout << " -host <host>" << std::endl; + std::cout << " Specifies host to connect to (default is localhost)" << std::endl; + std::cout << " -port <port>" << std::endl; + std::cout << " Specifies port to conect to (default is 5762)" << std::endl; + std::cout << " -trace" << std::endl; + std::cout << " Indicates that the frames sent and received should be logged" << std::endl; + std::cout << " -client" << std::endl; + std::cout << " Run as a client (else will run as a server)" << std::endl; +} |