summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/Makefile.am1
-rw-r--r--client/client_priv.h1
-rw-r--r--client/mysqlimport.c119
-rw-r--r--mysql-test/r/mysqldump.result32
-rw-r--r--mysql-test/t/mysqldump.test24
5 files changed, 160 insertions, 17 deletions
diff --git a/client/Makefile.am b/client/Makefile.am
index 849bd37eb57..6a851cf2f2c 100644
--- a/client/Makefile.am
+++ b/client/Makefile.am
@@ -49,6 +49,7 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc $(top_srcdir)/mysys/mf_tempdir.c \
$(top_srcdir)/mysys/base64.c
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysqlslap_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
+mysqlimport_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
mysqltestmanager_pwgen_SOURCES = mysqlmanager-pwgen.c
mysqltestmanagerc_SOURCES= mysqlmanagerc.c $(yassl_dummy_link_fix)
mysqlcheck_SOURCES= mysqlcheck.c $(yassl_dummy_link_fix)
diff --git a/client/client_priv.h b/client/client_priv.h
index 44f3ce227af..2763dabb027 100644
--- a/client/client_priv.h
+++ b/client/client_priv.h
@@ -54,6 +54,7 @@ enum options_client
OPT_MYSQL_LOCK_DIRECTORY,
OPT_MYSQL_SLAP_SLAVE,
OPT_USE_THREADS,
+ OPT_IMPORT_USE_THREADS,
OPT_MYSQL_NUMBER_OF_QUERY,
OPT_MYSQL_PRESERVE_SCHEMA,
OPT_IGNORE_TABLE,OPT_INSERT_IGNORE,OPT_SHOW_WARNINGS,OPT_DROP_DATABASE,
diff --git a/client/mysqlimport.c b/client/mysqlimport.c
index 8694093f06b..946ea645230 100644
--- a/client/mysqlimport.c
+++ b/client/mysqlimport.c
@@ -29,6 +29,10 @@
#include "client_priv.h"
#include "mysql_version.h"
+#include <my_pthread.h>
+
+/* Global Thread counter */
+int counter= 0;
static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql);
@@ -39,6 +43,7 @@ static char *add_load_option(char *ptr,const char *object,
static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0,
replace=0,silent=0,ignore=0,opt_compress=0,
opt_low_priority= 0, tty_password= 0;
+static my_bool opt_use_threads= 0;
static uint opt_local_file=0;
static MYSQL mysql_connection;
static char *opt_password=0, *current_user=0,
@@ -138,6 +143,11 @@ static struct my_option my_long_options[] =
(gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#include <sslopt-longopts.h>
+ {"use-threads", OPT_USE_THREADS,
+ "Parrelize the loading of files. Requires an arguement for the number \
+ threads to use for loading of data.",
+ (gptr*) &opt_use_threads, (gptr*) &opt_use_threads, 0,
+ GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DONT_ALLOW_USER_CHANGE
{"user", 'u', "User for login if not current user.", (gptr*) &current_user,
(gptr*) &current_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
@@ -287,7 +297,7 @@ static int write_to_table(char *filename, MYSQL *sock)
{
if (verbose)
fprintf(stdout, "Deleting the old data from table %s\n", tablename);
- sprintf(sql_statement, "DELETE FROM %s", tablename);
+ snprintf(sql_statement, FN_REFLEN*16+256, "DELETE FROM %s", tablename);
if (mysql_query(sock, sql_statement))
{
db_error_with_table(sock, tablename);
@@ -497,12 +507,39 @@ static char *field_escape(char *to,const char *from,uint length)
}
+int
+worker_thread(char *raw_table_name)
+{
+ MYSQL *sock= 0;
+ if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
+ {
+ goto error;
+ }
+
+ if (mysql_query(sock, "set @@character_set_database=binary;"))
+ {
+ db_error(sock); /* We shall countinue here, if --force was given */
+ goto error;
+ }
+
+ /*
+ We should do something about the error
+ */
+ write_to_table(raw_table_name, sock);
+
+error:
+ if (sock)
+ db_disconnect(current_host, sock);
+
+ counter--;
+ return 0;
+}
+
int main(int argc, char **argv)
{
int exitcode=0, error=0;
char **argv_to_free;
- MYSQL *sock=0;
MY_INIT(argv[0]);
load_defaults("my",load_default_groups,&argc,&argv);
@@ -513,25 +550,73 @@ int main(int argc, char **argv)
free_defaults(argv_to_free);
return(1);
}
- if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
- {
- free_defaults(argv_to_free);
- return(1); /* purecov: deadcode */
- }
- if (mysql_query(sock, "set @@character_set_database=binary;"))
+ if (opt_use_threads)
{
- db_error(sock); /* We shall countinue here, if --force was given */
- return(1);
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+
+ for (; *argv != NULL; argv++) // Loop through tables
+ {
+ /*
+ If we hit thread count limit we loop until some threads exit.
+ We sleep for a second, so that we don't chew up a lot of
+ CPU in the loop.
+ */
+sanity_label:
+ if (counter == opt_use_threads)
+ {
+ sleep(1);
+ goto sanity_label;
+ }
+ counter++;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ /* now create the thread */
+ if (pthread_create(&mainthread, &attr, (void *)worker_thread,
+ (void *)*argv) != 0)
+ {
+ counter--;
+ fprintf(stderr,"%s: Could not create thread\n",
+ my_progname);
+ }
+ }
+
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+loop_label:
+ if (counter)
+ {
+ sleep(1);
+ goto loop_label;
+ }
}
+ else
+ {
+ MYSQL *sock= 0;
+ if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
+ {
+ free_defaults(argv_to_free);
+ return(1); /* purecov: deadcode */
+ }
- if (lock_tables)
- lock_table(sock, argc, argv);
- for (; *argv != NULL; argv++)
- if ((error=write_to_table(*argv, sock)))
- if (exitcode == 0)
- exitcode = error;
- db_disconnect(current_host, sock);
+ if (mysql_query(sock, "set @@character_set_database=binary;"))
+ {
+ db_error(sock); /* We shall countinue here, if --force was given */
+ return(1);
+ }
+
+ if (lock_tables)
+ lock_table(sock, argc, argv);
+ for (; *argv != NULL; argv++)
+ if ((error=write_to_table(*argv, sock)))
+ if (exitcode == 0)
+ exitcode = error;
+ db_disconnect(current_host, sock);
+ }
my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR));
#ifdef HAVE_SMEM
my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR));
diff --git a/mysql-test/r/mysqldump.result b/mysql-test/r/mysqldump.result
index 79b22964f8a..7328a7da2eb 100644
--- a/mysql-test/r/mysqldump.result
+++ b/mysql-test/r/mysqldump.result
@@ -2650,3 +2650,35 @@ DELIMITER ;
DROP TRIGGER tr1;
DROP TABLE t1;
+create table t1 (a text , b text);
+create table t2 (a text , b text);
+insert t1 values ("Duck, Duck", "goose");
+insert t1 values ("Duck, Duck", "pidgeon");
+insert t2 values ("We the people", "in order to perform");
+insert t2 values ("a more perfect", "union");
+select * from t1;
+a b
+Duck, Duck goose
+Duck, Duck pidgeon
+select * from t2;
+a b
+We the people in order to perform
+a more perfect union
+test.t1: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+test.t2: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+test.t1: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+test.t2: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
+select * from t1;
+a b
+Duck, Duck goose
+Duck, Duck pidgeon
+Duck, Duck goose
+Duck, Duck pidgeon
+select * from t2;
+a b
+We the people in order to perform
+a more perfect union
+We the people in order to perform
+a more perfect union
+drop table t1;
+drop table t2;
diff --git a/mysql-test/t/mysqldump.test b/mysql-test/t/mysqldump.test
index 592bc289d05..dea8e32869e 100644
--- a/mysql-test/t/mysqldump.test
+++ b/mysql-test/t/mysqldump.test
@@ -1048,3 +1048,27 @@ SET SQL_MODE = @old_sql_mode;
DROP TRIGGER tr1;
DROP TABLE t1;
+
+#
+# Added for use-thread option
+#
+create table t1 (a text , b text);
+create table t2 (a text , b text);
+insert t1 values ("Duck, Duck", "goose");
+insert t1 values ("Duck, Duck", "pidgeon");
+insert t2 values ("We the people", "in order to perform");
+insert t2 values ("a more perfect", "union");
+select * from t1;
+select * from t2;
+--exec $MYSQL_DUMP --tab=$MYSQLTEST_VARDIR/tmp/ test
+--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t1.sql
+--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t2.sql
+# The first load tests the pausing code
+--exec $MYSQL_IMPORT --use-threads=1 test $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
+# Now we test with multiple threads!
+--exec $MYSQL_IMPORT --use-threads=5 test $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
+select * from t1;
+select * from t2;
+
+drop table t1;
+drop table t2;