diff options
author | Michael Goulish <mgoulish@apache.org> | 2009-08-03 18:07:12 +0000 |
---|---|---|
committer | Michael Goulish <mgoulish@apache.org> | 2009-08-03 18:07:12 +0000 |
commit | 80e1b83ae763f2840dd6ff0b0c4b5b00a45c3b6f (patch) | |
tree | 9bd5ab65aaaec69b83fee02a271b19c9f3bfd587 /cpp/src/tests/qrsh_server.cpp | |
parent | 946bf3f4ea756513556c04d7fd9bd8cbdd9f382e (diff) | |
download | qpid-python-80e1b83ae763f2840dd6ff0b0c4b5b00a45c3b6f.tar.gz |
qrsh initial checkin.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@800481 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/qrsh_server.cpp')
-rw-r--r-- | cpp/src/tests/qrsh_server.cpp | 1062 |
1 files changed, 1062 insertions, 0 deletions
diff --git a/cpp/src/tests/qrsh_server.cpp b/cpp/src/tests/qrsh_server.cpp new file mode 100644 index 0000000000..ab715801c9 --- /dev/null +++ b/cpp/src/tests/qrsh_server.cpp @@ -0,0 +1,1062 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <map> +#include <dirent.h> + +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/wait.h> + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + + +using namespace qpid::client; +using namespace qpid::framing; +using namespace std; + + + + +int +mrand ( int max_desired_val ) +{ + double zero_to_one = (double) rand() / (double) RAND_MAX; + return (int) (zero_to_one * (double) max_desired_val); +} + + + +char * +file2str ( char const * file_name ) +{ + FILE * fp = fopen ( file_name, "r" ); + if(! fp) + { + fprintf ( stderr, "file2str error: can't open file |%s|.\n", file_name ); + return 0; + } + + fseek ( fp, 0, SEEK_END ); + size_t file_len = (size_t) ftell ( fp ); + rewind ( fp ); + char * content = (char *) malloc ( file_len + 1 ); + + if ( ! content ) + { + fprintf ( stderr, + "file2str error: can't malloc %d bytes.\n", + (int)file_len + ); + return 0; + } + + size_t items_read = fread ( content, file_len, 1, fp ); + + if ( 1 != items_read ) + { + fprintf ( stderr, "file2str error: read failed.\n" ); + free ( content ); + return 0; + } + + fclose ( fp ); + content[file_len] = 0; + + return content; +} + + + + + +class QrshServer : public MessageListener +{ + public: + + QrshServer ( SubscriptionManager & subscriptions, + char const * name, + char const * qrsh_run_path, + char const * host, + int port + ); + + virtual void received ( Message & message); + + + private: + + set<string> all_server_names; + + stringstream data_dir; + + SubscriptionManager & subscriptions; + + // Is this message addressed to me? + bool myMessage ( Message const & message ); + + /* ---------------------------------------------- + * Special Commands + * These are commands that the qrsh_server executes + * directly, rather than through a child process + * instance of qrsh_run. + */ + void runCommand ( Message const & message ); + void execute ( Message const & message ); + void wait ( Message const & message ); + void exited ( Message const & message ); + void get ( Message const & message ); + void rememberIntroduction ( Message const & message ); + void getStraw ( Message const & message ); + void addAlias ( Message const & message ); + + void start ( ); + void sayHello ( ); + void sayName ( ); + // end Special Commands ------------------------ + + + void saveCommand ( Message const & message ); + + void send ( string const & content ); + + void drawStraws ( ); + void getNames ( ); + void runSavedCommand ( ); + + char ** getArgs ( char const * s ); + bool isProcessName ( char const * s ); + int string_countWords ( char const * s ); + char const * skipWord ( char const * s ); + + + void string_replaceAll ( string & str, + string & target, + string & replacement + ); + + + string name, + qrsh_run_path, + host; + + vector<string *> aliases; + + int port; + + map < char *, int > abstract_name_map; + + set < string > myFellowBrokers; + + bool saidHello; + + Message savedCommand; + + vector < int > straws; + int myStraw; + +}; + + + +QrshServer::QrshServer ( SubscriptionManager & subs, + char const * name, + char const * qrsh_run_path, + char const * host, + int port + ) + : subscriptions ( subs ), + name ( name ), + qrsh_run_path ( qrsh_run_path ), + host ( host ), + port ( port ), + saidHello ( false ), + myStraw ( 0 ) +{ + data_dir << "/tmp/qrsh_" + << getpid(); + + if(mkdir ( data_dir.str().c_str(), 0777 ) ) + { + fprintf ( stderr, + "QrshServer::QrshServer error: can't mkdir |%s|\n", + data_dir.str().c_str() + ); + exit ( 1 ); + } +} + + + +void +QrshServer::saveCommand ( Message const & message ) +{ + savedCommand = message; +} + + + +void +QrshServer::runSavedCommand ( ) +{ + runCommand ( savedCommand ); +} + + + +void +QrshServer::start ( ) +{ + stringstream announcement_data; + announcement_data << "hello_my_name_is " + << name; + + send ( announcement_data.str() ); + + saidHello = true; +} + + + + +void +QrshServer::send ( string const & content ) +{ + try + { + Message message; + message.setData ( content ); + + Connection connection; + connection.open ( host, port ); + Session session = connection.newSession ( ); + session.messageTransfer ( arg::content = message, + arg::destination = "amq.fanout" + ); + session.close(); + connection.close(); + } + catch ( exception const & e ) + { + fprintf ( stderr, "QrshServer::send error: |%s|\n", e.what() ); + } +} + + + + +void +QrshServer::sayHello ( ) +{ + if ( saidHello ) + return; + + stringstream ss; + + ss << "hello_my_name_is " + << name; + + send ( ss.str() ); + saidHello = true; +} + + + +void +QrshServer::sayName ( ) +{ + fprintf ( stderr, "My name is: |%s|\n", name.c_str() ); +} + + + + +void +QrshServer::drawStraws ( ) +{ + myStraw = mrand ( 1000000000 ); + stringstream ss; + ss << "straw " + << name + << ' ' + << myStraw; + send ( ss.str() ); +} + + + +void +QrshServer::getStraw ( Message const & message ) +{ + int straw; + + char brokerName[1000]; + sscanf ( message.getData().c_str(), "%*s%s", brokerName ); + + if ( ! strcmp ( brokerName, name.c_str() ) ) + return; + + sscanf ( message.getData().c_str(), "%*s%*s%d", & straw ); + straws.push_back ( straw ); + + bool i_win = true; + int ties = 0; + + if ( straws.size() >= myFellowBrokers.size() ) + { + // All votes are in! Let's see if I win! + for ( unsigned int i = 0; i < straws.size(); ++ i ) + { + if ( straws[i] == myStraw ) + ++ ties; + else + if ( straws[i] > myStraw ) + { + i_win = false; + break; + } + } + + if ( i_win && (ties <= 0) ) + { + myStraw = 0; + straws.clear(); + runSavedCommand ( ); + } + else + if ( i_win && (ties > 0) ) + { + fprintf ( stderr, "MDEBUG oh no! drawStraws error: server %s tied with straw %d!\n", name.c_str(), straw ); + } + } +} + + + + +/* + * "APB" command (all-points-bullitens (commands that are not addressed + * specifically to any server)) are handled directly, here. + * Because if I return simply "true", the normal command processing code + * will misinterpret the command. + */ +bool +QrshServer::myMessage ( Message const & message ) +{ + int const maxlen = 100; + char head[maxlen]; + char first_word [ maxlen + 1 ]; + strncpy ( head, message.getData().c_str(), maxlen ); + sscanf ( head, "%s", first_word ); + + if ( ! strcmp ( name.c_str(), first_word ) ) + { + return true; + } + else + { + // Is the given name one of my aliases? + char possibleAlias[1000]; + if(1 == sscanf ( message.getData().c_str(), "%s", possibleAlias )) + { + for ( unsigned int i = 0; i < aliases.size(); ++ i ) + { + + if ( ! strcmp ( possibleAlias, aliases[i]->c_str() )) + { + return true; + } + } + } + } + + if ( ! strcmp ( first_word, "hello_my_name_is" ) ) + { + rememberIntroduction ( message ); + sayHello ( ); + return false; + } + else + if ( ! strcmp ( first_word, "straw" ) ) + { + getStraw ( message ); + return false; + } + else + if ( ! strcmp ( first_word, "all" ) ) + { + return true; + } + else + if ( ! strcmp ( first_word, "any" ) ) + { + straws.clear(); + usleep ( 200000 ); + saveCommand ( message ); + drawStraws ( ); + return false; + } + else + return false; +} + + + + +void +QrshServer::rememberIntroduction ( Message const & message ) +{ + char brokerName [ 1000 ]; + sscanf ( message.getData().c_str(), "%*s%s", brokerName ); + + if ( strcmp ( brokerName, name.c_str() ) ) + myFellowBrokers.insert ( string ( brokerName ) ); +} + + + + +void +QrshServer::addAlias ( Message const & message ) +{ + char alias[1000]; + sscanf ( message.getData().c_str(), "%*s%*s%s", alias ); + aliases.push_back ( new string(alias) ); +} + + + + +void +QrshServer::getNames ( ) +{ + abstract_name_map.clear(); + + DIR * dir = opendir ( data_dir.str().c_str() ); + + if ( ! dir ) + { + fprintf ( stderr, + "QrshServer::getNames error: could not open dir |%s|.\n", + data_dir.str().c_str() + ); + return; + } + + struct dirent * file; + while ( (file = readdir ( dir ) ) ) + { + if ( '.' != file->d_name[0] ) + { + stringstream pid_file_name; + pid_file_name << data_dir.str() + << '/' + << file->d_name + << "/pid"; + + int pid = 0; + FILE * fp; + if ( (fp = fopen ( pid_file_name.str().c_str(), "r" ) ) ) + { + fscanf ( fp, "%d", & pid ); + fclose ( fp ); + abstract_name_map.insert(pair<char*, int>(strdup(file->d_name), pid)); + } + else + { + /* + * Fail silently. The non-existence of this file + * is not necessarily an error. + */ + } + } + } + closedir ( dir ); +} + + + +void +QrshServer::string_replaceAll ( string & str, + string & target, + string & replacement + ) +{ + int target_size = target.size(); + int found_pos = 0; + + while ( 0 <= (found_pos = str.find ( target ) ) ) + str.replace ( found_pos, target_size, replacement ); +} + + + + +bool +QrshServer::isProcessName ( char const * str ) +{ + getNames(); + map<char *, int>::iterator it; + for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it ) + { + if ( ! strcmp ( str, it->first ) ) + return true; + } + + return false; +} + + + + + +int +QrshServer::string_countWords ( char const * s1 ) +{ + int count = 0; + char const * s2 = s1 + 1; + + if ( ! isspace(* s1) ) + { + ++ count; + } + + for ( ; * s2; ++ s1, ++ s2 ) + { + // count space-to-word transitions. + if ( isspace(*s1) && (! isspace(*s2)) ) + ++ count; + } + + return count; +} + + + + +void +QrshServer::execute ( Message const & message ) +{ + // First, gather all the symbolic names we know. + getNames(); + + // Now make a copy of the command, that I can alter. + string command ( message.getData() ); + + + // Replace each occurrence of every abstract name with its pid. + char pid_str[100]; + map<char *, int>::iterator it; + for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it ) + { + sprintf ( pid_str, "%d", it->second ); + string target ( it->first ), + replacement ( pid_str ); + string_replaceAll ( command, target, replacement ); + } + + + char const * truncated_command = skipWord(skipWord(command.c_str())); + + if ( truncated_command ) + system ( truncated_command ); +} + + + + + +void +QrshServer::get ( Message const & request_message ) +{ + char * file_content; + + /* + * Get the contents of the requested file. + */ + char file_or_process_name[1000]; + sscanf ( request_message.getData().c_str(), "%*s%*s%s", file_or_process_name ); + + if ( isProcessName ( file_or_process_name ) ) + { + stringstream desired_file_name; + desired_file_name << data_dir.str() + << '/' + << file_or_process_name + << '/'; + char requested_output_stream[1000]; + if(1 != sscanf ( request_message.getData().c_str(), + "%*s%*s%*s%s", + requested_output_stream + ) + ) + { + fprintf ( stderr, + "QrshServer::get error: Can't read requested data file name from this message: |%s|\n", + request_message.getData().c_str() + ); + return; + } + desired_file_name << requested_output_stream; + file_content = file2str ( desired_file_name.str().c_str() ); + } + else + { + file_content = file2str ( file_or_process_name ); + } + + stringstream reply_data ; + reply_data << "get_response " + << file_content; + /* + * Send a response-message to the server who is waiting. + */ + send ( reply_data.str() ); +} + + + + + + +void +QrshServer::exited ( Message const & message ) +{ + int exit_code = -1; + + // First, gather all the symbolic names we know. + getNames(); + + // Now make a copy of the command, that I can alter. + string edited_command ( message.getData() ); + + // Replace each occurrence of every abstract name with its pid. + char pid_str[100]; + map<char *, int>::iterator it; + for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it ) + { + sprintf ( pid_str, "%d", it->second ); + string target ( it->first ), + replacement ( pid_str ); + string_replaceAll ( edited_command, target, replacement ); + } + + // Skip the service name. That is not used by the child. + char const * truncated_command = skipWord(edited_command.c_str()); + + if ( truncated_command ) + { + stringstream ss; + ss << qrsh_run_path + << ' ' + << data_dir.str() + << ' ' + << truncated_command; + + int child_pid; + if ( ! (child_pid = fork() ) ) + { + // This is the child. + + char ** argv = getArgs ( ss.str().c_str() ); + execv ( qrsh_run_path.c_str(), argv ); + + perror ( "qrsh_server: execv error: " ); + exit ( 1 ); + } + else + { + // This is the parent. + pid_t awaited_pid; + while ( 0 == (awaited_pid = waitpid ( child_pid, & exit_code, WNOHANG)) ) + { + fprintf ( stderr, "qrsh_server info: parent: waiting for child...\n" ); + sleep(1); + } + + if ( -1 == awaited_pid ) + { + fprintf ( stderr, "qrsh_server error awaiting child!\n" ); + exit ( 1 ); + } + + exit_code >>= 8; + + stringstream data; + data << "wait_response " + << exit_code; + + send ( data.str() ); + } + } +} + + + + +void +QrshServer::wait ( Message const & message ) +{ + bool pre_existing = false; + if ( 3 == string_countWords ( message.getData().c_str() ) ) + { + // The first word is the name of this service. + // The second word is "exec_wait". + // The third word is the symbolic name of the command to wait for. + // The fact that there are exactly three words means that this + // must be a command that has already been named and started -- + // we just need to find its pid and wait on it. + pre_existing = true; + } + + + int exit_code = -1; + + // First, gather all the symbolic names we know. + getNames(); + + // Now make a copy of the command, that I can alter. + string edited_command ( message.getData() ); + + // Replace each occurrence of every abstract name with its pid. + char pid_str[100]; + map<char *, int>::iterator it; + for ( it = abstract_name_map.begin(); it != abstract_name_map.end(); ++ it ) + { + sprintf ( pid_str, "%d", it->second ); + string target ( it->first ), + replacement ( pid_str ); + string_replaceAll ( edited_command, target, replacement ); + } + + // Skip the service name. That is not used by the child. + char const * truncated_command = skipWord(edited_command.c_str()); + + if ( truncated_command ) + { + stringstream ss; + ss << qrsh_run_path + << ' ' + << data_dir.str() + << ' ' + << truncated_command; + + int child_pid; + if ( ! (child_pid = fork() ) ) + { + // This is the child. + + char ** argv = getArgs ( ss.str().c_str() ); + execv ( qrsh_run_path.c_str(), argv ); + + perror ( "qrsh_server: execv error: " ); + exit ( 1 ); + } + else + { + // This is the parent. + pid_t awaited_pid; + while ( 0 == (awaited_pid = waitpid ( child_pid, & exit_code, WNOHANG)) ) + { + fprintf ( stderr, "qrsh_server info: parent: waiting for child...\n" ); + sleep(1); + } + + if ( -1 == awaited_pid ) + { + fprintf ( stderr, "qrsh_server error awaiting child!\n" ); + exit ( 1 ); + } + } + + exit_code >>= 8; + + stringstream data; + data << "wait_response " + << exit_code; + + send ( data.str() ); + } +} + + + + + +char const * +QrshServer::skipWord ( char const * s ) +{ + if(! (s && *s) ) + return 0; + + // skip past initial white space + while ( isspace(*s) ) + { + ++ s; + if(! *s) + return 0; + } + + // skip past first word + while ( ! isspace(*s) ) + { + ++ s; + if(! *s) + return 0; + } + + return s; +} + + + + + +char ** +QrshServer::getArgs ( char const * str ) +{ + char const * s = str; + + char ** argv = 0; + vector<int> start_positions, + lengths; + + int pos = 0; + int arg_len = 0; + + int n_args = 0; + while ( 1 ) + { + // advance over whitespace. + while ( isspace ( *s ) ) + { + ++ s; ++ pos; + if(! *s) + { + goto done; + } + } + + ++ n_args; + start_positions.push_back ( pos ); + arg_len = 0; + + // advance over non-whitespace. + while ( ! isspace ( *s ) ) + { + ++ s; ++ pos; ++ arg_len; + if(! *s) + { + lengths.push_back ( arg_len ); + arg_len = 0; + goto done; + } + } + + lengths.push_back ( arg_len ); + arg_len = 0; + } + + done: + + if ( arg_len > 0 ) + lengths.push_back ( arg_len ); + + // Alloc the array. + argv = (char **) malloc ( sizeof(char *) * ( n_args + 1 ) ); + argv[n_args] = 0; // mull-term the array. + + for ( int i = 0; i < n_args; ++ i ) + { + argv[i] = ( char *) malloc ( lengths[i] + 1 ); + strncpy ( argv[i], + str + start_positions[i], + lengths[i] + ); + argv[i][lengths[i]] = 0; + } + + return argv; +} + + + +void +QrshServer::runCommand ( Message const & message ) +{ + char const * s = message.getData().c_str(); + + /* + * Skip the first word, which is this server's name. + */ + while ( isspace(*s) ) // go to start of first word. + ++ s; + + while ( ! isspace(*s) ) // go to end of first word. + ++ s; + + while ( isspace(*s) ) // go to start of second word. + ++ s; + + char command_name[1000]; + sscanf ( s, "%s", command_name ); + + if ( ! strcmp ( "get", command_name ) ) + { + get ( message ); + } + else + if ( ! strcmp ( "exited", command_name ) ) + { + exited ( message ); + } + else + if ( ! strcmp ( "exec_wait", command_name ) ) + { + wait ( message ); + } + else + if ( ! strcmp ( "exec", command_name ) ) + { + execute ( message ); + } + else + if ( ! strcmp ( "start", command_name ) ) + { + start ( ); + } + else + if ( ! strcmp ( "alias", command_name ) ) + { + addAlias ( message ); + } + else + if ( ! strcmp ( "sayName", command_name ) ) + { + sayName ( ); + } + else + { + /* + * If the command is not any of the "special" commands + * above, then it's a "normal" command. + * That means we run it with a child process instance of + * qrsh_run, which will save all its data in the qrsh dir. + */ + stringstream ss; + ss << qrsh_run_path + << ' ' + << data_dir.str() + << ' ' + << s; + + if ( ! fork() ) + { + char ** argv = getArgs ( ss.str().c_str() ); + execv ( qrsh_run_path.c_str(), argv ); + perror ( "qrsh_server: execv error: " ); + } + } +} + + + +void +QrshServer::received ( Message & message ) +{ + if ( myMessage ( message ) ) + runCommand ( message ); +} + + + + + +/* + * fixme mick Mon Aug 3 10:29:26 EDT 2009 + * argv[1] server name + * argv[2] qrsh exe path + * argv[3] host + * argv[4] port + */ +int +main ( int /*argc*/, char** argv ) +{ + const char* host = argv[3]; + int port = atoi(argv[4]); + Connection connection; + Message msg; + + srand ( getpid() ); + + try + { + connection.open ( host, port ); + Session session = connection.newSession(); + + + // Declare queues. + string myQueue = session.getId().getName(); + session.queueDeclare ( arg::queue=myQueue, + arg::exclusive=true, + arg::autoDelete=true); + + session.exchangeBind ( arg::exchange="amq.fanout", + arg::queue=myQueue, + arg::bindingKey="my-key"); + + // Create a server and subscribe it to my queue. + SubscriptionManager subscriptions ( session ); + QrshServer server ( subscriptions, + argv[1], // server name + argv[2], // qrsh exe path + host, + port + ); + subscriptions.subscribe ( server, myQueue ); + + // Receive messages until the subscription is cancelled + // by QrshServer::received() + subscriptions.run(); + + connection.close(); + } + catch(const exception& error) + { + cout << error.what() << endl; + return 1; + } + + return 0; +} + + + + |