diff options
author | unknown <brian@zim.tangent.org> | 2006-02-13 00:45:59 -0800 |
---|---|---|
committer | unknown <brian@zim.tangent.org> | 2006-02-13 00:45:59 -0800 |
commit | bee0941d401168fd1688036900b9b8d1a613db0b (patch) | |
tree | a1e383dff70acb11f347b245f59fabed944ac3ce /client/mysqlimport.c | |
parent | 033b29eee0cad528c890cfa53246f6d7f04b5189 (diff) | |
download | mariadb-git-bee0941d401168fd1688036900b9b8d1a613db0b.tar.gz |
Adding thread support for mysqlimport. You can now specify a number of threads to use and it will thread the loading of the database. Anyone who has had to go through the pain of loading the database will immediatly get the reason for this.
client/Makefile.am:
Adding client_thread_libs for mysqlimport (aka it gets pthreads)
client/client_priv.h:
New option
client/mysqlimport.c:
Reworked logic to allow someone to use threads.
mysql-test/r/mysqldump.result:
New results
mysql-test/t/mysqldump.test:
Added tests for threads.
Diffstat (limited to 'client/mysqlimport.c')
-rw-r--r-- | client/mysqlimport.c | 119 |
1 files changed, 102 insertions, 17 deletions
diff --git a/client/mysqlimport.c b/client/mysqlimport.c index 8694093f06b..946ea645230 100644 --- a/client/mysqlimport.c +++ b/client/mysqlimport.c @@ -29,6 +29,10 @@ #include "client_priv.h" #include "mysql_version.h" +#include <my_pthread.h> + +/* Global Thread counter */ +int counter= 0; static void db_error_with_table(MYSQL *mysql, char *table); static void db_error(MYSQL *mysql); @@ -39,6 +43,7 @@ static char *add_load_option(char *ptr,const char *object, static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0, replace=0,silent=0,ignore=0,opt_compress=0, opt_low_priority= 0, tty_password= 0; +static my_bool opt_use_threads= 0; static uint opt_local_file=0; static MYSQL mysql_connection; static char *opt_password=0, *current_user=0, @@ -138,6 +143,11 @@ static struct my_option my_long_options[] = (gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #include <sslopt-longopts.h> + {"use-threads", OPT_USE_THREADS, + "Parrelize the loading of files. Requires an arguement for the number \ + threads to use for loading of data.", + (gptr*) &opt_use_threads, (gptr*) &opt_use_threads, 0, + GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #ifndef DONT_ALLOW_USER_CHANGE {"user", 'u', "User for login if not current user.", (gptr*) ¤t_user, (gptr*) ¤t_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, @@ -287,7 +297,7 @@ static int write_to_table(char *filename, MYSQL *sock) { if (verbose) fprintf(stdout, "Deleting the old data from table %s\n", tablename); - sprintf(sql_statement, "DELETE FROM %s", tablename); + snprintf(sql_statement, FN_REFLEN*16+256, "DELETE FROM %s", tablename); if (mysql_query(sock, sql_statement)) { db_error_with_table(sock, tablename); @@ -497,12 +507,39 @@ static char *field_escape(char *to,const char *from,uint length) } +int +worker_thread(char *raw_table_name) +{ + MYSQL *sock= 0; + if (!(sock= db_connect(current_host,current_db,current_user,opt_password))) + { + goto error; + } + + if (mysql_query(sock, "set @@character_set_database=binary;")) + { + db_error(sock); /* We shall countinue here, if --force was given */ + goto error; + } + + /* + We should do something about the error + */ + write_to_table(raw_table_name, sock); + +error: + if (sock) + db_disconnect(current_host, sock); + + counter--; + return 0; +} + int main(int argc, char **argv) { int exitcode=0, error=0; char **argv_to_free; - MYSQL *sock=0; MY_INIT(argv[0]); load_defaults("my",load_default_groups,&argc,&argv); @@ -513,25 +550,73 @@ int main(int argc, char **argv) free_defaults(argv_to_free); return(1); } - if (!(sock= db_connect(current_host,current_db,current_user,opt_password))) - { - free_defaults(argv_to_free); - return(1); /* purecov: deadcode */ - } - if (mysql_query(sock, "set @@character_set_database=binary;")) + if (opt_use_threads) { - db_error(sock); /* We shall countinue here, if --force was given */ - return(1); + pthread_t mainthread; /* Thread descriptor */ + pthread_attr_t attr; /* Thread attributes */ + + for (; *argv != NULL; argv++) // Loop through tables + { + /* + If we hit thread count limit we loop until some threads exit. + We sleep for a second, so that we don't chew up a lot of + CPU in the loop. + */ +sanity_label: + if (counter == opt_use_threads) + { + sleep(1); + goto sanity_label; + } + counter++; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, + PTHREAD_CREATE_DETACHED); + + /* now create the thread */ + if (pthread_create(&mainthread, &attr, (void *)worker_thread, + (void *)*argv) != 0) + { + counter--; + fprintf(stderr,"%s: Could not create thread\n", + my_progname); + } + } + + /* + We loop until we know that all children have cleaned up. + */ +loop_label: + if (counter) + { + sleep(1); + goto loop_label; + } } + else + { + MYSQL *sock= 0; + if (!(sock= db_connect(current_host,current_db,current_user,opt_password))) + { + free_defaults(argv_to_free); + return(1); /* purecov: deadcode */ + } - if (lock_tables) - lock_table(sock, argc, argv); - for (; *argv != NULL; argv++) - if ((error=write_to_table(*argv, sock))) - if (exitcode == 0) - exitcode = error; - db_disconnect(current_host, sock); + if (mysql_query(sock, "set @@character_set_database=binary;")) + { + db_error(sock); /* We shall countinue here, if --force was given */ + return(1); + } + + if (lock_tables) + lock_table(sock, argc, argv); + for (; *argv != NULL; argv++) + if ((error=write_to_table(*argv, sock))) + if (exitcode == 0) + exitcode = error; + db_disconnect(current_host, sock); + } my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR)); #ifdef HAVE_SMEM my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR)); |