diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/Makefile.am | 5 | ||||
-rw-r--r-- | client/client_priv.h | 1 | ||||
-rw-r--r-- | client/mysqlimport.c | 134 | ||||
-rw-r--r-- | client/mysqlslap.c | 6 | ||||
-rw-r--r-- | client/mysqltest.c | 163 |
5 files changed, 197 insertions, 112 deletions
diff --git a/client/Makefile.am b/client/Makefile.am index 849bd37eb57..14ebadbfacb 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -49,6 +49,8 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc $(top_srcdir)/mysys/mf_tempdir.c \ $(top_srcdir)/mysys/base64.c mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS) mysqlslap_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS) +mysqlimport_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS) \ + $(top_builddir)/mysys/libmysys.a mysqltestmanager_pwgen_SOURCES = mysqlmanager-pwgen.c mysqltestmanagerc_SOURCES= mysqlmanagerc.c $(yassl_dummy_link_fix) mysqlcheck_SOURCES= mysqlcheck.c $(yassl_dummy_link_fix) @@ -57,7 +59,8 @@ mysqlslap_SOURCES= mysqlslap.c $(top_srcdir)/mysys/my_lock.c \ $(top_srcdir)/mysys/my_alarm.c \ $(yassl_dummy_link_fix) mysqldump_SOURCES= mysqldump.c my_user.c $(yassl_dummy_link_fix) -mysqlimport_SOURCES= mysqlimport.c $(yassl_dummy_link_fix) +mysqlimport_SOURCES= mysqlimport.c \ + $(yassl_dummy_link_fix) sql_src=log_event.h mysql_priv.h log_event.cc my_decimal.h my_decimal.cc strings_src=decimal.c diff --git a/client/client_priv.h b/client/client_priv.h index 44f3ce227af..2763dabb027 100644 --- a/client/client_priv.h +++ b/client/client_priv.h @@ -54,6 +54,7 @@ enum options_client OPT_MYSQL_LOCK_DIRECTORY, OPT_MYSQL_SLAP_SLAVE, OPT_USE_THREADS, + OPT_IMPORT_USE_THREADS, OPT_MYSQL_NUMBER_OF_QUERY, OPT_MYSQL_PRESERVE_SCHEMA, OPT_IGNORE_TABLE,OPT_INSERT_IGNORE,OPT_SHOW_WARNINGS,OPT_DROP_DATABASE, diff --git a/client/mysqlimport.c b/client/mysqlimport.c index 8694093f06b..afc1724b87b 100644 --- a/client/mysqlimport.c +++ b/client/mysqlimport.c @@ -29,6 +29,12 @@ #include "client_priv.h" #include "mysql_version.h" +#include <my_pthread.h> + + +/* Global Thread counter */ +int counter= 0; +pthread_mutex_t counter_mutex; static void db_error_with_table(MYSQL *mysql, char *table); static void db_error(MYSQL *mysql); @@ -39,6 +45,7 @@ static char *add_load_option(char *ptr,const char *object, static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0, replace=0,silent=0,ignore=0,opt_compress=0, opt_low_priority= 0, tty_password= 0; +static my_bool opt_use_threads= 0; static uint opt_local_file=0; static MYSQL mysql_connection; static char *opt_password=0, *current_user=0, @@ -108,8 +115,9 @@ static struct my_option my_long_options[] = REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"local", 'L', "Read all files through the client.", (gptr*) &opt_local_file, (gptr*) &opt_local_file, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, - {"lock-tables", 'l', "Lock all tables for write.", (gptr*) &lock_tables, - (gptr*) &lock_tables, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"lock-tables", 'l', "Lock all tables for write (this disables threads).", + (gptr*) &lock_tables, (gptr*) &lock_tables, 0, GET_BOOL, NO_ARG, + 0, 0, 0, 0, 0, 0}, {"low-priority", OPT_LOW_PRIORITY, "Use LOW_PRIORITY when updating the table.", (gptr*) &opt_low_priority, (gptr*) &opt_low_priority, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -138,6 +146,11 @@ static struct my_option my_long_options[] = (gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #include <sslopt-longopts.h> + {"use-threads", OPT_USE_THREADS, + "Load files in parallel. The argument is the number " + "of threads to use for loading data.", + (gptr*) &opt_use_threads, (gptr*) &opt_use_threads, 0, + GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #ifndef DONT_ALLOW_USER_CHANGE {"user", 'u', "User for login if not current user.", (gptr*) ¤t_user, (gptr*) ¤t_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, @@ -287,7 +300,7 @@ static int write_to_table(char *filename, MYSQL *sock) { if (verbose) fprintf(stdout, "Deleting the old data from table %s\n", tablename); - sprintf(sql_statement, "DELETE FROM %s", tablename); + snprintf(sql_statement, FN_REFLEN*16+256, "DELETE FROM %s", tablename); if (mysql_query(sock, sql_statement)) { db_error_with_table(sock, tablename); @@ -497,12 +510,41 @@ static char *field_escape(char *to,const char *from,uint length) } +pthread_handler_t worker_thread(void *arg) +{ + char *raw_table_name= (char *)arg; + MYSQL *sock= 0; + if (!(sock= db_connect(current_host,current_db,current_user,opt_password))) + { + goto error; + } + + if (mysql_query(sock, "set @@character_set_database=binary;")) + { + db_error(sock); /* We shall countinue here, if --force was given */ + goto error; + } + + /* + We should do something about the error + */ + write_to_table(raw_table_name, sock); + +error: + if (sock) + db_disconnect(current_host, sock); + + pthread_mutex_lock(&counter_mutex); + counter--; + pthread_mutex_unlock(&counter_mutex); + return 0; +} + int main(int argc, char **argv) { int exitcode=0, error=0; char **argv_to_free; - MYSQL *sock=0; MY_INIT(argv[0]); load_defaults("my",load_default_groups,&argc,&argv); @@ -513,25 +555,79 @@ int main(int argc, char **argv) free_defaults(argv_to_free); return(1); } - if (!(sock= db_connect(current_host,current_db,current_user,opt_password))) - { - free_defaults(argv_to_free); - return(1); /* purecov: deadcode */ - } - if (mysql_query(sock, "set @@character_set_database=binary;")) + if (opt_use_threads && !lock_tables) { - db_error(sock); /* We shall countinue here, if --force was given */ - return(1); + pthread_t mainthread; /* Thread descriptor */ + pthread_attr_t attr; /* Thread attributes */ + VOID(pthread_mutex_init(&counter_mutex, NULL)); + + for (; *argv != NULL; argv++) /* Loop through tables */ + { + /* + If we hit thread count limit we loop until some threads exit. + We sleep for a second, so that we don't chew up a lot of + CPU in the loop. + */ +sanity_label: + if (counter == opt_use_threads) + { + sleep(1); + goto sanity_label; + } + pthread_mutex_lock(&counter_mutex); + counter++; + pthread_mutex_unlock(&counter_mutex); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, + PTHREAD_CREATE_DETACHED); + + /* now create the thread */ + if (pthread_create(&mainthread, &attr, worker_thread, + (void *)*argv) != 0) + { + pthread_mutex_lock(&counter_mutex); + counter--; + pthread_mutex_unlock(&counter_mutex); + fprintf(stderr,"%s: Could not create thread\n", + my_progname); + } + } + + /* + We loop until we know that all children have cleaned up. + */ +loop_label: + if (counter) + { + sleep(1); + goto loop_label; + } + VOID(pthread_mutex_destroy(&counter_mutex)); } + else + { + MYSQL *sock= 0; + if (!(sock= db_connect(current_host,current_db,current_user,opt_password))) + { + free_defaults(argv_to_free); + return(1); /* purecov: deadcode */ + } - if (lock_tables) - lock_table(sock, argc, argv); - for (; *argv != NULL; argv++) - if ((error=write_to_table(*argv, sock))) - if (exitcode == 0) - exitcode = error; - db_disconnect(current_host, sock); + if (mysql_query(sock, "set @@character_set_database=binary;")) + { + db_error(sock); /* We shall countinue here, if --force was given */ + return(1); + } + + if (lock_tables) + lock_table(sock, argc, argv); + for (; *argv != NULL; argv++) + if ((error=write_to_table(*argv, sock))) + if (exitcode == 0) + exitcode = error; + db_disconnect(current_host, sock); + } my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR)); #ifdef HAVE_SMEM my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR)); diff --git a/client/mysqlslap.c b/client/mysqlslap.c index b80150a8d67..c0986c2d867 100644 --- a/client/mysqlslap.c +++ b/client/mysqlslap.c @@ -55,7 +55,7 @@ load statements, and then run all the queries in the query file with five clients (five times each): - mysqlslap --drop-schema --concurrency=5 \ + mysqlslap --concurrency=5 \ --iterations=5 --query=query.sql --create=create.sql \ --delimiter=";" @@ -425,12 +425,12 @@ static struct my_option my_long_options[] = (gptr*) &lock_directory, (gptr*) &lock_directory, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"number-char-cols", 'x', - "Number of INT columns to create table with if specifying --sql-generate-sql.", + "Number of INT columns to create table with if specifying --auto-generate-sql.", (gptr*) &num_char_cols, (gptr*) &num_char_cols, 0, GET_UINT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0}, {"number-int-cols", 'y', "Number of VARCHAR columns to create table with if specifying " - "--sql-generate-sql.", (gptr*) &num_int_cols, (gptr*) &num_int_cols, 0, + "--auto-generate-sql.", (gptr*) &num_int_cols, (gptr*) &num_int_cols, 0, GET_UINT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0}, {"number-of-queries", OPT_MYSQL_NUMBER_OF_QUERY, "Limit each client to this number of queries (this is not exact).", diff --git a/client/mysqltest.c b/client/mysqltest.c index 98097b1896e..ce1a64ec653 100644 --- a/client/mysqltest.c +++ b/client/mysqltest.c @@ -3605,6 +3605,59 @@ static void replace_dynstr_append(DYNAMIC_STRING *ds, const char *val) /* + Append the result for one field to the dynamic string ds +*/ + +static void append_field(DYNAMIC_STRING *ds, uint col_idx, MYSQL_FIELD* field, + const char* val, ulonglong len, bool is_null) +{ + + char buf[256]; + if (col_idx < max_replace_column && replace_column[col_idx]) + { + val= replace_column[col_idx]; + len= strlen(val); + } + else if (is_null) + { + val= "NULL"; + len= 4; + } +#ifdef __WIN__ + else if ((field->type == MYSQL_TYPE_DOUBLE || + field->type == MYSQL_TYPE_FLOAT ) && + field->decimals >= 31) + { + /* Convert 1.2e+018 to 1.2e+18 and 1.2e-018 to 1.2e-18 */ + char *start= strchr(val, 'e'); + if (start && strlen(start) >= 5 && + (start[1] == '-' || start[1] == '+') && start[2] == '0') + { + start+=2; /* Now points at first '0' */ + /* Move all chars after the first '0' one step left */ + memmove(start, start + 1, strlen(start)); + len--; + } + } +#endif + + if (!display_result_vertically) + { + if (col_idx) + dynstr_append_mem(ds, "\t", 1); + replace_dynstr_append_mem(ds, val, (int)len); + } + else + { + dynstr_append(ds, field->name); + dynstr_append_mem(ds, "\t", 1); + replace_dynstr_append_mem(ds, val, (int)len); + dynstr_append_mem(ds, "\n", 1); + } +} + + +/* Append all results to the dynamic string separated with '\t' Values may be converted with 'replace_column' */ @@ -3613,41 +3666,16 @@ static void append_result(DYNAMIC_STRING *ds, MYSQL_RES *res) { MYSQL_ROW row; uint num_fields= mysql_num_fields(res); - MYSQL_FIELD *fields= !display_result_vertically ? 0 : mysql_fetch_fields(res); + MYSQL_FIELD *fields= mysql_fetch_fields(res); ulong *lengths; + while ((row = mysql_fetch_row(res))) { uint i; lengths = mysql_fetch_lengths(res); for (i = 0; i < num_fields; i++) - { - const char *val= row[i]; - ulonglong len= lengths[i]; - - if (i < max_replace_column && replace_column[i]) - { - val= replace_column[i]; - len= strlen(val); - } - if (!val) - { - val= "NULL"; - len= 4; - } - if (!display_result_vertically) - { - if (i) - dynstr_append_mem(ds, "\t", 1); - replace_dynstr_append_mem(ds, val, (int)len); - } - else - { - dynstr_append(ds, fields[i].name); - dynstr_append_mem(ds, "\t", 1); - replace_dynstr_append_mem(ds, val, (int)len); - dynstr_append_mem(ds, "\n", 1); - } - } + append_field(ds, i, &fields[i], + (const char*)row[i], lengths[i], !row[i]); if (!display_result_vertically) dynstr_append_mem(ds, "\n", 1); } @@ -3661,13 +3689,12 @@ static void append_result(DYNAMIC_STRING *ds, MYSQL_RES *res) */ static void append_stmt_result(DYNAMIC_STRING *ds, MYSQL_STMT *stmt, - MYSQL_FIELD *field, uint num_fields) + MYSQL_FIELD *fields, uint num_fields) { MYSQL_BIND *bind; my_bool *is_null; ulong *length; - ulonglong num_rows; - uint col_idx, row_idx; + uint i; /* Allocate array with bind structs, lengths and NULL flags */ bind= (MYSQL_BIND*) my_malloc(num_fields * sizeof(MYSQL_BIND), @@ -3677,71 +3704,29 @@ static void append_stmt_result(DYNAMIC_STRING *ds, MYSQL_STMT *stmt, is_null= (my_bool*) my_malloc(num_fields * sizeof(my_bool), MYF(MY_WME | MY_FAE)); - for (col_idx= 0; col_idx < num_fields; col_idx++) + /* Allocate data for the result of each field */ + for (i= 0; i < num_fields; i++) { - /* Allocate data for output */ - uint max_length= field[col_idx].max_length + 1; - char *str_data= (char *) my_malloc(max_length, MYF(MY_WME | MY_FAE)); - - bind[col_idx].buffer_type= MYSQL_TYPE_STRING; - bind[col_idx].buffer= (char *)str_data; - bind[col_idx].buffer_length= max_length; - bind[col_idx].is_null= &is_null[col_idx]; - bind[col_idx].length= &length[col_idx]; + uint max_length= fields[i].max_length + 1; + bind[i].buffer_type= MYSQL_TYPE_STRING; + bind[i].buffer= (char *)my_malloc(max_length, MYF(MY_WME | MY_FAE)); + bind[i].buffer_length= max_length; + bind[i].is_null= &is_null[i]; + bind[i].length= &length[i]; DBUG_PRINT("bind", ("col[%d]: buffer_type: %d, buffer_length: %d", - col_idx, - bind[col_idx].buffer_type, - bind[col_idx].buffer_length)); + i, bind[i].buffer_type, bind[i].buffer_length)); } - /* Fill in the data into the structures created above */ if (mysql_stmt_bind_result(stmt, bind)) die("mysql_stmt_bind_result failed: %d: %s", mysql_stmt_errno(stmt), mysql_stmt_error(stmt)); - /* Read result from each row */ - num_rows= mysql_stmt_num_rows(stmt); - for (row_idx= 0; row_idx < num_rows; row_idx++) + while (mysql_stmt_fetch(stmt) == 0) { - if (mysql_stmt_fetch(stmt)) - die("mysql_stmt_fetch failed: %d %s", - mysql_stmt_errno(stmt), mysql_stmt_error(stmt)); - - /* Read result from each column */ - for (col_idx= 0; col_idx < num_fields; col_idx++) - { - const char *val; - ulonglong len; - if (col_idx < max_replace_column && replace_column[col_idx]) - { - val= replace_column[col_idx]; - len= strlen(val); - } - else if (*bind[col_idx].is_null) - { - val= "NULL"; - len= 4; - } - else - { - val= (const char *) bind[col_idx].buffer; - len= *bind[col_idx].length; - } - if (!display_result_vertically) - { - if (col_idx) /* No tab before first col */ - dynstr_append_mem(ds, "\t", 1); - replace_dynstr_append_mem(ds, val, (int)len); - } - else - { - dynstr_append(ds, field[col_idx].name); - dynstr_append_mem(ds, "\t", 1); - replace_dynstr_append_mem(ds, val, (int)len); - dynstr_append_mem(ds, "\n", 1); - } - } + for (i= 0; i < num_fields; i++) + append_field(ds, i, &fields[i], (const char *) bind[i].buffer, + *bind[i].length, *bind[i].is_null); if (!display_result_vertically) dynstr_append_mem(ds, "\n", 1); } @@ -3752,10 +3737,10 @@ static void append_stmt_result(DYNAMIC_STRING *ds, MYSQL_STMT *stmt, free_replace_column(); - for (col_idx= 0; col_idx < num_fields; col_idx++) + for (i= 0; i < num_fields; i++) { /* Free data for output */ - my_free((gptr)bind[col_idx].buffer, MYF(MY_WME | MY_FAE)); + my_free((gptr)bind[i].buffer, MYF(MY_WME | MY_FAE)); } /* Free array with bind structs, lengths and NULL flags */ my_free((gptr)bind , MYF(MY_WME | MY_FAE)); |