summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-11-21 11:49:14 +0000
committerGordon Sim <gsim@apache.org>2006-11-21 11:49:14 +0000
commit1e158fcf8f50975967aaa63f86aa9de18526425e (patch)
tree67ceb06bef84ec5c7f0434ca139e1f4ea2737464 /cpp
parent605362a77aea68fb728bef528130879152dfb730 (diff)
downloadqpid-python-1e158fcf8f50975967aaa63f86aa9de18526425e.tar.gz
Added another example for client api.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@477636 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp13
-rw-r--r--cpp/src/qpid/client/Connector.h1
-rw-r--r--cpp/test/client/echo_service.cpp197
3 files changed, 207 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index e00652361c..86fbdc062c 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -98,6 +98,12 @@ void Connector::writeToSocket(char* data, size_t available){
}
}
+void Connector::handleClosed(){
+ closed = true;
+ socket.close();
+ if(shutdownHandler) shutdownHandler->shutdown();
+}
+
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
int64_t now = Time::now().msecs();
@@ -106,9 +112,7 @@ void Connector::checkIdle(ssize_t status){
timeoutHandler->idleIn();
}
}else if(status == Socket::SOCKET_EOF){
- closed = true;
- socket.close();
- if(shutdownHandler) shutdownHandler->shutdown();
+ handleClosed();
}else{
lastIn = now;
}
@@ -153,7 +157,7 @@ void Connector::run(){
ssize_t received = socket.recv(inbuf.start(), available);
checkIdle(received);
- if(received > 0){
+ if(!closed && received > 0){
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
@@ -168,5 +172,6 @@ void Connector::run(){
}
}catch(QpidError error){
std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ handleClosed();
}
}
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index b8a37d3b22..08705a4eb3 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -70,6 +70,7 @@ namespace client {
void setSocketTimeout();
void run();
+ void handleClosed();
public:
Connector(bool debug = false, u_int32_t buffer_size = 1024);
diff --git a/cpp/test/client/echo_service.cpp b/cpp/test/client/echo_service.cpp
new file mode 100644
index 0000000000..9d10610c83
--- /dev/null
+++ b/cpp/test/client/echo_service.cpp
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/QpidError.h>
+#include <qpid/client/Channel.h>
+#include <qpid/client/Connection.h>
+#include <qpid/client/Exchange.h>
+#include <qpid/client/MessageListener.h>
+#include <qpid/client/Queue.h>
+#include <qpid/sys/Time.h>
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+class EchoServer : public MessageListener{
+ Channel* const channel;
+public:
+ EchoServer(Channel* channel);
+ virtual void received(Message& msg);
+};
+
+class LoggingListener : public MessageListener{
+public:
+ virtual void received(Message& msg);
+};
+
+class Args{
+ string host;
+ int port;
+ bool trace;
+ bool help;
+ bool client;
+public:
+ inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){}
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+ inline bool getClient() const { return client; }
+};
+
+int main(int argc, char** argv){
+ const std::string echo_service("echo_service");
+ Args args;
+ args.parse(argc, argv);
+ if (args.getHelp()) {
+ args.usage();
+ } else if (args.getClient()) {
+ try {
+ //Create connection & open a channel
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel;
+ connection.openChannel(&channel);
+
+ //Setup: declare the private 'response' queue and bind it
+ //to the direct exchange by its name which will be
+ //generated by the server
+ Queue response;
+ channel.declareQueue(response);
+ qpid::framing::FieldTable emptyArgs;
+ channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, response, response.getName(), emptyArgs);
+
+ //Consume from the response queue, logging all echoed message to console:
+ LoggingListener listener;
+ std::string tag;
+ channel.consume(response, tag, &listener);
+
+ //Process incoming requests on a new thread
+ channel.start();
+
+ //get messages from console and send them:
+ std::string text;
+ std::cout << "Enter text to send:" << std::endl;
+ while (std::getline(std::cin, text)) {
+ std::cout << "Sending " << text << " to echo server." << std::endl;
+ Message msg;
+ msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
+ msg.setData(text);
+ channel.publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, echo_service);
+
+ std::cout << "Enter text to send:" << std::endl;
+ }
+
+ connection.close();
+ } catch(qpid::QpidError error) {
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ } else {
+ try {
+ //Create connection & open a channel
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel;
+ connection.openChannel(&channel);
+
+ //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name
+ Queue request("request");
+ channel.declareQueue(request);
+ qpid::framing::FieldTable emptyArgs;
+ channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, request, echo_service, emptyArgs);
+
+ //Consume from the request queue, echoing back all messages received to the client that sent them
+ EchoServer server(&channel);
+ std::string tag = "server_tag";
+ channel.consume(request, tag, &server);
+
+ //Process incoming requests on the main thread
+ channel.run();
+
+ connection.close();
+ } catch(qpid::QpidError error) {
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }
+}
+
+EchoServer::EchoServer(Channel* _channel) : channel(_channel){}
+
+void EchoServer::received(Message& message)
+{
+ //get name of response queues binding to the default direct exchange:
+ const std::string name = message.getHeaders().getString("RESPONSE_QUEUE");
+
+ if (name.empty()) {
+ std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl;
+ } else {
+ //print message to console:
+ std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
+
+ //'echo' the message back:
+ channel->publish(message, Exchange::DEFAULT_DIRECT_EXCHANGE, name);
+ }
+}
+
+void LoggingListener::received(Message& message)
+{
+ //print message to console:
+ std::cout << "Received echo: " << message.getData() << std::endl;
+}
+
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else if("-client" == name){
+ client = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+ std::cout << " -client" << std::endl;
+ std::cout << " Run as a client (else will run as a server)" << std::endl;
+}