summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
Diffstat (limited to 'client')
-rw-r--r--client/Makefile.am5
-rw-r--r--client/client_priv.h1
-rw-r--r--client/mysqlimport.c134
-rw-r--r--client/mysqlslap.c6
-rw-r--r--client/mysqltest.c163
5 files changed, 197 insertions, 112 deletions
diff --git a/client/Makefile.am b/client/Makefile.am
index 849bd37eb57..14ebadbfacb 100644
--- a/client/Makefile.am
+++ b/client/Makefile.am
@@ -49,6 +49,8 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc $(top_srcdir)/mysys/mf_tempdir.c \
$(top_srcdir)/mysys/base64.c
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysqlslap_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
+mysqlimport_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS) \
+ $(top_builddir)/mysys/libmysys.a
mysqltestmanager_pwgen_SOURCES = mysqlmanager-pwgen.c
mysqltestmanagerc_SOURCES= mysqlmanagerc.c $(yassl_dummy_link_fix)
mysqlcheck_SOURCES= mysqlcheck.c $(yassl_dummy_link_fix)
@@ -57,7 +59,8 @@ mysqlslap_SOURCES= mysqlslap.c $(top_srcdir)/mysys/my_lock.c \
$(top_srcdir)/mysys/my_alarm.c \
$(yassl_dummy_link_fix)
mysqldump_SOURCES= mysqldump.c my_user.c $(yassl_dummy_link_fix)
-mysqlimport_SOURCES= mysqlimport.c $(yassl_dummy_link_fix)
+mysqlimport_SOURCES= mysqlimport.c \
+ $(yassl_dummy_link_fix)
sql_src=log_event.h mysql_priv.h log_event.cc my_decimal.h my_decimal.cc
strings_src=decimal.c
diff --git a/client/client_priv.h b/client/client_priv.h
index 44f3ce227af..2763dabb027 100644
--- a/client/client_priv.h
+++ b/client/client_priv.h
@@ -54,6 +54,7 @@ enum options_client
OPT_MYSQL_LOCK_DIRECTORY,
OPT_MYSQL_SLAP_SLAVE,
OPT_USE_THREADS,
+ OPT_IMPORT_USE_THREADS,
OPT_MYSQL_NUMBER_OF_QUERY,
OPT_MYSQL_PRESERVE_SCHEMA,
OPT_IGNORE_TABLE,OPT_INSERT_IGNORE,OPT_SHOW_WARNINGS,OPT_DROP_DATABASE,
diff --git a/client/mysqlimport.c b/client/mysqlimport.c
index 8694093f06b..afc1724b87b 100644
--- a/client/mysqlimport.c
+++ b/client/mysqlimport.c
@@ -29,6 +29,12 @@
#include "client_priv.h"
#include "mysql_version.h"
+#include <my_pthread.h>
+
+
+/* Global Thread counter */
+int counter= 0;
+pthread_mutex_t counter_mutex;
static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql);
@@ -39,6 +45,7 @@ static char *add_load_option(char *ptr,const char *object,
static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0,
replace=0,silent=0,ignore=0,opt_compress=0,
opt_low_priority= 0, tty_password= 0;
+static my_bool opt_use_threads= 0;
static uint opt_local_file=0;
static MYSQL mysql_connection;
static char *opt_password=0, *current_user=0,
@@ -108,8 +115,9 @@ static struct my_option my_long_options[] =
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"local", 'L', "Read all files through the client.", (gptr*) &opt_local_file,
(gptr*) &opt_local_file, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
- {"lock-tables", 'l', "Lock all tables for write.", (gptr*) &lock_tables,
- (gptr*) &lock_tables, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"lock-tables", 'l', "Lock all tables for write (this disables threads).",
+ (gptr*) &lock_tables, (gptr*) &lock_tables, 0, GET_BOOL, NO_ARG,
+ 0, 0, 0, 0, 0, 0},
{"low-priority", OPT_LOW_PRIORITY,
"Use LOW_PRIORITY when updating the table.", (gptr*) &opt_low_priority,
(gptr*) &opt_low_priority, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
@@ -138,6 +146,11 @@ static struct my_option my_long_options[] =
(gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#include <sslopt-longopts.h>
+ {"use-threads", OPT_USE_THREADS,
+ "Load files in parallel. The argument is the number "
+ "of threads to use for loading data.",
+ (gptr*) &opt_use_threads, (gptr*) &opt_use_threads, 0,
+ GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DONT_ALLOW_USER_CHANGE
{"user", 'u', "User for login if not current user.", (gptr*) &current_user,
(gptr*) &current_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
@@ -287,7 +300,7 @@ static int write_to_table(char *filename, MYSQL *sock)
{
if (verbose)
fprintf(stdout, "Deleting the old data from table %s\n", tablename);
- sprintf(sql_statement, "DELETE FROM %s", tablename);
+ snprintf(sql_statement, FN_REFLEN*16+256, "DELETE FROM %s", tablename);
if (mysql_query(sock, sql_statement))
{
db_error_with_table(sock, tablename);
@@ -497,12 +510,41 @@ static char *field_escape(char *to,const char *from,uint length)
}
+pthread_handler_t worker_thread(void *arg)
+{
+ char *raw_table_name= (char *)arg;
+ MYSQL *sock= 0;
+ if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
+ {
+ goto error;
+ }
+
+ if (mysql_query(sock, "set @@character_set_database=binary;"))
+ {
+ db_error(sock); /* We shall countinue here, if --force was given */
+ goto error;
+ }
+
+ /*
+ We should do something about the error
+ */
+ write_to_table(raw_table_name, sock);
+
+error:
+ if (sock)
+ db_disconnect(current_host, sock);
+
+ pthread_mutex_lock(&counter_mutex);
+ counter--;
+ pthread_mutex_unlock(&counter_mutex);
+ return 0;
+}
+
int main(int argc, char **argv)
{
int exitcode=0, error=0;
char **argv_to_free;
- MYSQL *sock=0;
MY_INIT(argv[0]);
load_defaults("my",load_default_groups,&argc,&argv);
@@ -513,25 +555,79 @@ int main(int argc, char **argv)
free_defaults(argv_to_free);
return(1);
}
- if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
- {
- free_defaults(argv_to_free);
- return(1); /* purecov: deadcode */
- }
- if (mysql_query(sock, "set @@character_set_database=binary;"))
+ if (opt_use_threads && !lock_tables)
{
- db_error(sock); /* We shall countinue here, if --force was given */
- return(1);
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+ VOID(pthread_mutex_init(&counter_mutex, NULL));
+
+ for (; *argv != NULL; argv++) /* Loop through tables */
+ {
+ /*
+ If we hit thread count limit we loop until some threads exit.
+ We sleep for a second, so that we don't chew up a lot of
+ CPU in the loop.
+ */
+sanity_label:
+ if (counter == opt_use_threads)
+ {
+ sleep(1);
+ goto sanity_label;
+ }
+ pthread_mutex_lock(&counter_mutex);
+ counter++;
+ pthread_mutex_unlock(&counter_mutex);
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ /* now create the thread */
+ if (pthread_create(&mainthread, &attr, worker_thread,
+ (void *)*argv) != 0)
+ {
+ pthread_mutex_lock(&counter_mutex);
+ counter--;
+ pthread_mutex_unlock(&counter_mutex);
+ fprintf(stderr,"%s: Could not create thread\n",
+ my_progname);
+ }
+ }
+
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+loop_label:
+ if (counter)
+ {
+ sleep(1);
+ goto loop_label;
+ }
+ VOID(pthread_mutex_destroy(&counter_mutex));
}
+ else
+ {
+ MYSQL *sock= 0;
+ if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
+ {
+ free_defaults(argv_to_free);
+ return(1); /* purecov: deadcode */
+ }
- if (lock_tables)
- lock_table(sock, argc, argv);
- for (; *argv != NULL; argv++)
- if ((error=write_to_table(*argv, sock)))
- if (exitcode == 0)
- exitcode = error;
- db_disconnect(current_host, sock);
+ if (mysql_query(sock, "set @@character_set_database=binary;"))
+ {
+ db_error(sock); /* We shall countinue here, if --force was given */
+ return(1);
+ }
+
+ if (lock_tables)
+ lock_table(sock, argc, argv);
+ for (; *argv != NULL; argv++)
+ if ((error=write_to_table(*argv, sock)))
+ if (exitcode == 0)
+ exitcode = error;
+ db_disconnect(current_host, sock);
+ }
my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR));
#ifdef HAVE_SMEM
my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR));
diff --git a/client/mysqlslap.c b/client/mysqlslap.c
index b80150a8d67..c0986c2d867 100644
--- a/client/mysqlslap.c
+++ b/client/mysqlslap.c
@@ -55,7 +55,7 @@
load statements, and then run all the queries in the query file
with five clients (five times each):
- mysqlslap --drop-schema --concurrency=5 \
+ mysqlslap --concurrency=5 \
--iterations=5 --query=query.sql --create=create.sql \
--delimiter=";"
@@ -425,12 +425,12 @@ static struct my_option my_long_options[] =
(gptr*) &lock_directory, (gptr*) &lock_directory, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"number-char-cols", 'x',
- "Number of INT columns to create table with if specifying --sql-generate-sql.",
+ "Number of INT columns to create table with if specifying --auto-generate-sql.",
(gptr*) &num_char_cols, (gptr*) &num_char_cols, 0, GET_UINT, REQUIRED_ARG,
1, 0, 0, 0, 0, 0},
{"number-int-cols", 'y',
"Number of VARCHAR columns to create table with if specifying "
- "--sql-generate-sql.", (gptr*) &num_int_cols, (gptr*) &num_int_cols, 0,
+ "--auto-generate-sql.", (gptr*) &num_int_cols, (gptr*) &num_int_cols, 0,
GET_UINT, REQUIRED_ARG, 1, 0, 0, 0, 0, 0},
{"number-of-queries", OPT_MYSQL_NUMBER_OF_QUERY,
"Limit each client to this number of queries (this is not exact).",
diff --git a/client/mysqltest.c b/client/mysqltest.c
index 98097b1896e..ce1a64ec653 100644
--- a/client/mysqltest.c
+++ b/client/mysqltest.c
@@ -3605,6 +3605,59 @@ static void replace_dynstr_append(DYNAMIC_STRING *ds, const char *val)
/*
+ Append the result for one field to the dynamic string ds
+*/
+
+static void append_field(DYNAMIC_STRING *ds, uint col_idx, MYSQL_FIELD* field,
+ const char* val, ulonglong len, bool is_null)
+{
+
+ char buf[256];
+ if (col_idx < max_replace_column && replace_column[col_idx])
+ {
+ val= replace_column[col_idx];
+ len= strlen(val);
+ }
+ else if (is_null)
+ {
+ val= "NULL";
+ len= 4;
+ }
+#ifdef __WIN__
+ else if ((field->type == MYSQL_TYPE_DOUBLE ||
+ field->type == MYSQL_TYPE_FLOAT ) &&
+ field->decimals >= 31)
+ {
+ /* Convert 1.2e+018 to 1.2e+18 and 1.2e-018 to 1.2e-18 */
+ char *start= strchr(val, 'e');
+ if (start && strlen(start) >= 5 &&
+ (start[1] == '-' || start[1] == '+') && start[2] == '0')
+ {
+ start+=2; /* Now points at first '0' */
+ /* Move all chars after the first '0' one step left */
+ memmove(start, start + 1, strlen(start));
+ len--;
+ }
+ }
+#endif
+
+ if (!display_result_vertically)
+ {
+ if (col_idx)
+ dynstr_append_mem(ds, "\t", 1);
+ replace_dynstr_append_mem(ds, val, (int)len);
+ }
+ else
+ {
+ dynstr_append(ds, field->name);
+ dynstr_append_mem(ds, "\t", 1);
+ replace_dynstr_append_mem(ds, val, (int)len);
+ dynstr_append_mem(ds, "\n", 1);
+ }
+}
+
+
+/*
Append all results to the dynamic string separated with '\t'
Values may be converted with 'replace_column'
*/
@@ -3613,41 +3666,16 @@ static void append_result(DYNAMIC_STRING *ds, MYSQL_RES *res)
{
MYSQL_ROW row;
uint num_fields= mysql_num_fields(res);
- MYSQL_FIELD *fields= !display_result_vertically ? 0 : mysql_fetch_fields(res);
+ MYSQL_FIELD *fields= mysql_fetch_fields(res);
ulong *lengths;
+
while ((row = mysql_fetch_row(res)))
{
uint i;
lengths = mysql_fetch_lengths(res);
for (i = 0; i < num_fields; i++)
- {
- const char *val= row[i];
- ulonglong len= lengths[i];
-
- if (i < max_replace_column && replace_column[i])
- {
- val= replace_column[i];
- len= strlen(val);
- }
- if (!val)
- {
- val= "NULL";
- len= 4;
- }
- if (!display_result_vertically)
- {
- if (i)
- dynstr_append_mem(ds, "\t", 1);
- replace_dynstr_append_mem(ds, val, (int)len);
- }
- else
- {
- dynstr_append(ds, fields[i].name);
- dynstr_append_mem(ds, "\t", 1);
- replace_dynstr_append_mem(ds, val, (int)len);
- dynstr_append_mem(ds, "\n", 1);
- }
- }
+ append_field(ds, i, &fields[i],
+ (const char*)row[i], lengths[i], !row[i]);
if (!display_result_vertically)
dynstr_append_mem(ds, "\n", 1);
}
@@ -3661,13 +3689,12 @@ static void append_result(DYNAMIC_STRING *ds, MYSQL_RES *res)
*/
static void append_stmt_result(DYNAMIC_STRING *ds, MYSQL_STMT *stmt,
- MYSQL_FIELD *field, uint num_fields)
+ MYSQL_FIELD *fields, uint num_fields)
{
MYSQL_BIND *bind;
my_bool *is_null;
ulong *length;
- ulonglong num_rows;
- uint col_idx, row_idx;
+ uint i;
/* Allocate array with bind structs, lengths and NULL flags */
bind= (MYSQL_BIND*) my_malloc(num_fields * sizeof(MYSQL_BIND),
@@ -3677,71 +3704,29 @@ static void append_stmt_result(DYNAMIC_STRING *ds, MYSQL_STMT *stmt,
is_null= (my_bool*) my_malloc(num_fields * sizeof(my_bool),
MYF(MY_WME | MY_FAE));
- for (col_idx= 0; col_idx < num_fields; col_idx++)
+ /* Allocate data for the result of each field */
+ for (i= 0; i < num_fields; i++)
{
- /* Allocate data for output */
- uint max_length= field[col_idx].max_length + 1;
- char *str_data= (char *) my_malloc(max_length, MYF(MY_WME | MY_FAE));
-
- bind[col_idx].buffer_type= MYSQL_TYPE_STRING;
- bind[col_idx].buffer= (char *)str_data;
- bind[col_idx].buffer_length= max_length;
- bind[col_idx].is_null= &is_null[col_idx];
- bind[col_idx].length= &length[col_idx];
+ uint max_length= fields[i].max_length + 1;
+ bind[i].buffer_type= MYSQL_TYPE_STRING;
+ bind[i].buffer= (char *)my_malloc(max_length, MYF(MY_WME | MY_FAE));
+ bind[i].buffer_length= max_length;
+ bind[i].is_null= &is_null[i];
+ bind[i].length= &length[i];
DBUG_PRINT("bind", ("col[%d]: buffer_type: %d, buffer_length: %d",
- col_idx,
- bind[col_idx].buffer_type,
- bind[col_idx].buffer_length));
+ i, bind[i].buffer_type, bind[i].buffer_length));
}
- /* Fill in the data into the structures created above */
if (mysql_stmt_bind_result(stmt, bind))
die("mysql_stmt_bind_result failed: %d: %s",
mysql_stmt_errno(stmt), mysql_stmt_error(stmt));
- /* Read result from each row */
- num_rows= mysql_stmt_num_rows(stmt);
- for (row_idx= 0; row_idx < num_rows; row_idx++)
+ while (mysql_stmt_fetch(stmt) == 0)
{
- if (mysql_stmt_fetch(stmt))
- die("mysql_stmt_fetch failed: %d %s",
- mysql_stmt_errno(stmt), mysql_stmt_error(stmt));
-
- /* Read result from each column */
- for (col_idx= 0; col_idx < num_fields; col_idx++)
- {
- const char *val;
- ulonglong len;
- if (col_idx < max_replace_column && replace_column[col_idx])
- {
- val= replace_column[col_idx];
- len= strlen(val);
- }
- else if (*bind[col_idx].is_null)
- {
- val= "NULL";
- len= 4;
- }
- else
- {
- val= (const char *) bind[col_idx].buffer;
- len= *bind[col_idx].length;
- }
- if (!display_result_vertically)
- {
- if (col_idx) /* No tab before first col */
- dynstr_append_mem(ds, "\t", 1);
- replace_dynstr_append_mem(ds, val, (int)len);
- }
- else
- {
- dynstr_append(ds, field[col_idx].name);
- dynstr_append_mem(ds, "\t", 1);
- replace_dynstr_append_mem(ds, val, (int)len);
- dynstr_append_mem(ds, "\n", 1);
- }
- }
+ for (i= 0; i < num_fields; i++)
+ append_field(ds, i, &fields[i], (const char *) bind[i].buffer,
+ *bind[i].length, *bind[i].is_null);
if (!display_result_vertically)
dynstr_append_mem(ds, "\n", 1);
}
@@ -3752,10 +3737,10 @@ static void append_stmt_result(DYNAMIC_STRING *ds, MYSQL_STMT *stmt,
free_replace_column();
- for (col_idx= 0; col_idx < num_fields; col_idx++)
+ for (i= 0; i < num_fields; i++)
{
/* Free data for output */
- my_free((gptr)bind[col_idx].buffer, MYF(MY_WME | MY_FAE));
+ my_free((gptr)bind[i].buffer, MYF(MY_WME | MY_FAE));
}
/* Free array with bind structs, lengths and NULL flags */
my_free((gptr)bind , MYF(MY_WME | MY_FAE));