summaryrefslogtreecommitdiff
path: root/client/mysqlimport.c
diff options
context:
space:
mode:
Diffstat (limited to 'client/mysqlimport.c')
-rw-r--r--client/mysqlimport.c58
1 files changed, 48 insertions, 10 deletions
diff --git a/client/mysqlimport.c b/client/mysqlimport.c
index 37807a4eea5..688789ec436 100644
--- a/client/mysqlimport.c
+++ b/client/mysqlimport.c
@@ -37,6 +37,7 @@
/* Global Thread counter */
uint counter= 0;
+pthread_mutex_t init_mutex;
pthread_mutex_t counter_mutex;
pthread_cond_t count_threshhold;
@@ -422,8 +423,19 @@ static MYSQL *db_connect(char *host, char *database,
my_bool reconnect;
if (verbose)
fprintf(stdout, "Connecting to %s\n", host ? host : "localhost");
- if (!(mysql= mysql_init(NULL)))
- return 0;
+ if (opt_use_threads && !lock_tables)
+ {
+ pthread_mutex_lock(&init_mutex);
+ if (!(mysql= mysql_init(NULL)))
+ {
+ pthread_mutex_unlock(&init_mutex);
+ return 0;
+ }
+ pthread_mutex_unlock(&init_mutex);
+ }
+ else
+ if (!(mysql= mysql_init(NULL)))
+ return 0;
if (opt_compress)
mysql_options(mysql,MYSQL_OPT_COMPRESS,NullS);
if (opt_local_file)
@@ -616,7 +628,7 @@ error:
pthread_cond_signal(&count_threshhold);
pthread_mutex_unlock(&counter_mutex);
mysql_thread_end();
-
+ pthread_exit(0);
return 0;
}
@@ -640,15 +652,31 @@ int main(int argc, char **argv)
if (opt_use_threads && !lock_tables)
{
- pthread_t mainthread; /* Thread descriptor */
- pthread_attr_t attr; /* Thread attributes */
+ char **save_argv;
+ uint worker_thread_count= 0, table_count= 0, i= 0;
+ pthread_t *worker_threads; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,
- PTHREAD_CREATE_DETACHED);
+ PTHREAD_CREATE_JOINABLE);
+ pthread_mutex_init(&init_mutex, NULL);
pthread_mutex_init(&counter_mutex, NULL);
pthread_cond_init(&count_threshhold, NULL);
+ /* Count the number of tables. This number denotes the total number
+ of threads spawn.
+ */
+ save_argv= argv;
+ for (table_count= 0; *argv != NULL; argv++)
+ table_count++;
+ argv= save_argv;
+
+ if (!(worker_threads= (pthread_t*) my_malloc(table_count *
+ sizeof(*worker_threads),
+ MYF(0))))
+ return -2;
+
for (counter= 0; *argv != NULL; argv++) /* Loop through tables */
{
pthread_mutex_lock(&counter_mutex);
@@ -663,15 +691,16 @@ int main(int argc, char **argv)
counter++;
pthread_mutex_unlock(&counter_mutex);
/* now create the thread */
- if (pthread_create(&mainthread, &attr, worker_thread,
- (void *)*argv) != 0)
+ if (pthread_create(&worker_threads[worker_thread_count], &attr,
+ worker_thread, (void *)*argv) != 0)
{
pthread_mutex_lock(&counter_mutex);
counter--;
pthread_mutex_unlock(&counter_mutex);
- fprintf(stderr,"%s: Could not create thread\n",
- my_progname);
+ fprintf(stderr,"%s: Could not create thread\n", my_progname);
+ continue;
}
+ worker_thread_count++;
}
/*
@@ -686,9 +715,18 @@ int main(int argc, char **argv)
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
}
pthread_mutex_unlock(&counter_mutex);
+ pthread_mutex_destroy(&init_mutex);
pthread_mutex_destroy(&counter_mutex);
pthread_cond_destroy(&count_threshhold);
pthread_attr_destroy(&attr);
+
+ for(i= 0; i < worker_thread_count; i++)
+ {
+ if (pthread_join(worker_threads[i], NULL))
+ fprintf(stderr,"%s: Could not join worker thread.\n", my_progname);
+ }
+
+ my_free(worker_threads);
}
else
{