diff options
-rw-r--r-- | mysys/thr_lock.c | 257 | ||||
-rw-r--r-- | scripts/mysqld_safe.sh | 68 | ||||
-rw-r--r-- | sql/events.cc | 1 | ||||
-rw-r--r-- | sql/handler.cc | 2 | ||||
-rw-r--r-- | sql/lock.cc | 9 | ||||
-rw-r--r-- | sql/log_event.cc | 45 | ||||
-rw-r--r-- | sql/mysqld.cc | 1027 | ||||
-rw-r--r-- | sql/slave.cc | 17 | ||||
-rw-r--r-- | sql/sql_base.cc | 19 | ||||
-rw-r--r-- | sql/sql_insert.cc | 6 | ||||
-rw-r--r-- | sql/sql_parse.cc | 25 | ||||
-rw-r--r-- | sql/sql_repl.cc | 9 | ||||
-rw-r--r-- | sql/sql_show.cc | 6 |
13 files changed, 857 insertions, 634 deletions
diff --git a/mysys/thr_lock.c b/mysys/thr_lock.c index 308e46a2f76..4f7c727594b 100644 --- a/mysys/thr_lock.c +++ b/mysys/thr_lock.c @@ -664,6 +664,108 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, DBUG_RETURN(result); } +#ifdef WITH_WSREP +/* + * If brute force applier would need to wait for a thr lock, + * it needs to make sure that it will get the lock without (too much) + * delay. + * We identify here the owners of blocking locks and ask them to + * abort. We then put our lock request in the first place in the + * wait queue. When lock holders abort (one by one) the lock release + * algorithm should grant the lock to us. We rely on this and proceed + * to wait_for_locks(). + * wsrep_break_locks() should be called in all the cases, where lock + * wait would happen. + * + * TODO: current implementation might not cover all possible lock wait + * situations. This needs an review still. + * TODO: lock release, might favor some other lock (instead our bf). + * This needs an condition to check for bf locks first. + * TODO: we still have a debug fprintf, this should be removed + */ +static inline my_bool +wsrep_break_lock( + THR_LOCK_DATA *data, struct st_lock_list *lock_queue1, + struct st_lock_list *lock_queue2, struct st_lock_list *wait_queue) +{ + if (wsrep_on(data->owner->mysql_thd) && + wsrep_thd_is_brute_force && + wsrep_thd_is_brute_force(data->owner->mysql_thd)) + { + THR_LOCK_DATA *holder; + + /* if locking session conversion to transaction has been enabled, + we know that this conflicting lock must be read lock and furthermore, + lock holder is read-only. It is safe to wait for him. + */ +#ifdef TODO + if (wsrep_convert_LOCK_to_trx && + (THD*)(data->owner->mysql_thd)->in_lock_tables) + { + if (wsrep_debug) + fprintf(stderr,"WSREP wsrep_break_lock read lock untouched\n"); + return FALSE; + } +#endif + if (wsrep_debug) + fprintf(stderr,"WSREP wsrep_break_lock aborting locks\n"); + + /* aborting lock holder(s) here */ + for (holder=(lock_queue1) ? lock_queue1->data : NULL; + holder; + holder=holder->next) + { + if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd)) + { + wsrep_abort_thd(data->owner->mysql_thd, + holder->owner->mysql_thd, FALSE); + } + else + { + if (wsrep_debug) + fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n"); + return FALSE; + } + } + for (holder=(lock_queue2) ? lock_queue2->data : NULL; + holder; + holder=holder->next) + { + if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd)) + { + wsrep_abort_thd(data->owner->mysql_thd, + holder->owner->mysql_thd, FALSE); + } + else + { + if (wsrep_debug) + fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n"); + return FALSE; + } + } + + /* Add our lock to the head of the wait queue */ + if (*(wait_queue->last)==wait_queue->data) + { + wait_queue->last=&data->next; + assert(wait_queue->data==0); + } + else + { + assert(wait_queue->data!=0); + wait_queue->data->prev=&data->next; + } + data->next=wait_queue->data; + data->prev=&wait_queue->data; + wait_queue->data=data; + data->cond=get_cond(); + + statistic_increment(locks_immediate,&THR_LOCK_lock); + return TRUE; + } + return FALSE; +} +#endif static enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) @@ -672,6 +774,9 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) enum enum_thr_lock_result result= THR_LOCK_SUCCESS; struct st_lock_list *wait_queue; enum thr_lock_type lock_type= data->type; +#ifdef WITH_WSREP + my_bool wsrep_lock_inserted= FALSE; +#endif MYSQL_TABLE_WAIT_VARIABLES(locker, state) /* no ';' */ DBUG_ENTER("thr_lock"); @@ -741,6 +846,14 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) } if (lock->write.data->type == TL_WRITE_ONLY) { +#ifdef WITH_WSREP + if (wsrep_break_lock(data, &lock->write, NULL, &lock->read_wait)) + { + wsrep_lock_inserted= TRUE; + goto wsrep_read_wait; + } +#endif + /* We are not allowed to get a READ lock in this case */ data->type=TL_UNLOCK; result= THR_LOCK_ABORTED; /* Can't wait for this one */ @@ -768,6 +881,14 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) lock but a high priority write waiting in the write_wait queue. In the latter case we should yield the lock to the writer. */ +#ifdef WITH_WSREP + if (wsrep_break_lock(data, &lock->write, NULL, &lock->read_wait)) + { + wsrep_lock_inserted= TRUE; + } + wsrep_read_wait: +#endif + wait_queue= &lock->read_wait; } else /* Request for WRITE lock */ @@ -776,12 +897,25 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) { if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY) { +#ifdef WITH_WSREP + if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait)) + { + wsrep_lock_inserted=TRUE; + goto wsrep_write_wait; + } +#endif data->type=TL_UNLOCK; result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } if (lock->write.data || lock->read.data) { +#ifdef WITH_WSREP + if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait)) + { + goto end; + } +#endif /* Add delayed write lock to write_wait queue, and return at once */ (*lock->write_wait.last)=data; data->prev=lock->write_wait.last; @@ -806,6 +940,13 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) /* Allow lock owner to bypass TL_WRITE_ONLY. */ if (!thr_lock_owner_equal(data->owner, lock->write.data->owner)) { +#ifdef WITH_WSREP + if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait)) + { + wsrep_lock_inserted=TRUE; + goto wsrep_write_wait; + } +#endif /* We are not allowed to get a lock in this case */ data->type=TL_UNLOCK; result= THR_LOCK_ABORTED; /* Can't wait for this one */ @@ -909,9 +1050,22 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout) DBUG_PRINT("lock",("write locked 3 by thread: 0x%lx type: %d", lock->read.data->owner->thread_id, data->type)); } +#ifdef WITH_WSREP + if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait)) + { + wsrep_lock_inserted= TRUE; + } + wsrep_write_wait: + +#endif + wait_queue= &lock->write_wait; } /* Can't get lock yet; Wait for it */ +#ifdef WITH_WSREP + if (wsrep_on(data->owner->mysql_thd) && wsrep_lock_inserted) + DBUG_RETURN(wait_for_lock(wait_queue, data, 1, lock_wait_timeout)); +#endif result= wait_for_lock(wait_queue, data, 0, lock_wait_timeout); MYSQL_END_TABLE_LOCK_WAIT(locker); DBUG_RETURN(result); @@ -1174,109 +1328,6 @@ static void sort_locks(THR_LOCK_DATA **data,uint count) } } -#ifdef WITH_WSREP -/* - * If brute force applier would need to wait for a thr lock, - * it needs to make sure that it will get the lock without (too much) - * delay. - * We identify here the owners of blocking locks and ask them to - * abort. We then put our lock request in the first place in the - * wait queue. When lock holders abort (one by one) the lock release - * algorithm should grant the lock to us. We rely on this and proceed - * to wait_for_locks(). - * wsrep_break_locks() should be called in all the cases, where lock - * wait would happen. - * - * TODO: current implementation might not cover all possible lock wait - * situations. This needs an review still. - * TODO: lock release, might favor some other lock (instead our bf). - * This needs an condition to check for bf locks first. - * TODO: we still have a debug fprintf, this should be removed - */ -static inline my_bool -wsrep_break_lock( - THR_LOCK_DATA *data, struct st_lock_list *lock_queue1, - struct st_lock_list *lock_queue2, struct st_lock_list *wait_queue) -{ - if (wsrep_on(data->owner->mysql_thd) && - wsrep_thd_is_brute_force && - wsrep_thd_is_brute_force(data->owner->mysql_thd)) - { - THR_LOCK_DATA *holder; - - /* if locking session conversion to transaction has been enabled, - we know that this conflicting lock must be read lock and furthermore, - lock holder is read-only. It is safe to wait for him. - */ -#ifdef TODO - if (wsrep_convert_LOCK_to_trx && - (THD*)(data->owner->mysql_thd)->in_lock_tables) - { - if (wsrep_debug) - fprintf(stderr,"WSREP wsrep_break_lock read lock untouched\n"); - return FALSE; - } -#endif - if (wsrep_debug) - fprintf(stderr,"WSREP wsrep_break_lock aborting locks\n"); - - /* aborting lock holder(s) here */ - for (holder=(lock_queue1) ? lock_queue1->data : NULL; - holder; - holder=holder->next) - { - if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd)) - { - wsrep_abort_thd(data->owner->mysql_thd, - holder->owner->mysql_thd, FALSE); - } - else - { - if (wsrep_debug) - fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n"); - return FALSE; - } - } - for (holder=(lock_queue2) ? lock_queue2->data : NULL; - holder; - holder=holder->next) - { - if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd)) - { - wsrep_abort_thd(data->owner->mysql_thd, - holder->owner->mysql_thd, FALSE); - } - else - { - if (wsrep_debug) - fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n"); - return FALSE; - } - } - - /* Add our lock to the head of the wait queue */ - if (*(wait_queue->last)==wait_queue->data) - { - wait_queue->last=&data->next; - assert(wait_queue->data==0); - } - else - { - assert(wait_queue->data!=0); - wait_queue->data->prev=&data->next; - } - data->next=wait_queue->data; - data->prev=&wait_queue->data; - wait_queue->data=data; - data->cond=get_cond(); - - statistic_increment(locks_immediate,&THR_LOCK_lock); - return TRUE; - } - return FALSE; -} -#endif - enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner, ulong lock_wait_timeout) diff --git a/scripts/mysqld_safe.sh b/scripts/mysqld_safe.sh index 7db2a5605e1..113c3236fab 100644 --- a/scripts/mysqld_safe.sh +++ b/scripts/mysqld_safe.sh @@ -18,6 +18,8 @@ niceness=0 nowatch=0 mysqld_ld_preload= mysqld_ld_library_path= +flush_caches=0 +numa_interleave=0 # Initial logging status: error log is not open, and not using syslog logging=init @@ -85,6 +87,10 @@ Usage: $0 [OPTIONS] --syslog Log messages to syslog with 'logger' --skip-syslog Log messages to error log (default) --syslog-tag=TAG Pass -t "mysqld-TAG" to 'logger' + --flush-caches Flush and purge buffers/caches before + starting the server + --numa-interleave Run mysqld with its memory interleaved + on all NUMA nodes All other options are passed to the mysqld program. @@ -312,6 +318,8 @@ parse_arguments() { --syslog-tag=*) syslog_tag="$val" ;; --timezone=*) TZ="$val"; export TZ; ;; --wsrep[-_]urls=*) wsrep_urls="$val"; ;; + --flush-caches) flush_caches=1 ;; + --numa-interleave) numa_interleave=1 ;; --wsrep[-_]provider=*) if test -n "$val" && test "$val" != "none" then @@ -834,6 +842,40 @@ mysqld daemon not started" fi fi +# Flush and purge buffers/caches. +# + +if @TARGET_LINUX@ && test $flush_caches -eq 1 +then + # Locate sync, ensure it exists. + if ! my_which sync > /dev/null 2>&1 + then + log_error "sync command not found, required for --flush-caches" + exit 1 + # Flush file system buffers. + elif ! sync + then + # Huh, the sync() function is always successful... + log_error "sync failed, check if sync is properly installed" + fi + + # Locate sysctl, ensure it exists. + if ! my_which sysctl > /dev/null 2>&1 + then + log_error "sysctl command not found, required for --flush-caches" + exit 1 + # Purge page cache, dentries and inodes. + elif ! sysctl -q -w vm.drop_caches=3 + then + log_error "sysctl failed, check the error message for details" + exit 1 + fi +elif test $flush_caches -eq 1 +then + log_error "--flush-caches is not supported on this platform" + exit 1 +fi + # # Uncomment the following lines if you want all tables to be automatically # checked and repaired during startup. You should add sensible key_buffer @@ -854,6 +896,32 @@ fi cmd="`mysqld_ld_preload_text`$NOHUP_NICENESS" +# +# Set mysqld's memory interleave policy. +# + +if @TARGET_LINUX@ && test $numa_interleave -eq 1 +then + # Locate numactl, ensure it exists. + if ! my_which numactl > /dev/null 2>&1 + then + log_error "numactl command not found, required for --numa-interleave" + exit 1 + # Attempt to run a command, ensure it works. + elif ! numactl --interleave=all true + then + log_error "numactl failed, check if numactl is properly installed" + fi + + # Launch mysqld with numactl. + cmd="$cmd numactl --interleave=all" +elif test $numa_interleave -eq 1 +then + log_error "--numa-interleave is not supported on this platform" + exit 1 +fi + + for i in "$ledir/$MYSQLD" "$defaults" "--basedir=$MY_BASEDIR_VERSION" \ "--datadir=$DATADIR" "--plugin-dir=$plugin_dir" "$USER_OPTION" do diff --git a/sql/events.cc b/sql/events.cc index 96c4effe3d9..7df8d19644d 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1167,6 +1167,7 @@ end: close_mysql_tables(thd); DBUG_RETURN(ret); } + #ifdef WITH_WSREP int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len) { diff --git a/sql/handler.cc b/sql/handler.cc index e20afb5798f..dd297740c7d 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1387,7 +1387,6 @@ int ha_commit_trans(THD *thd, bool all) err= ht->prepare(ht, thd, all); status_var_increment(thd->status_var.ha_prepare_count); if (err) - { #ifdef WITH_WSREP if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP) { @@ -1402,7 +1401,6 @@ int ha_commit_trans(THD *thd, bool all) /* not wsrep hton, bail to native mysql behavior */ #endif my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); - } if (err) goto err; diff --git a/sql/lock.cc b/sql/lock.cc index 99a53ebfe3b..62540bcb55a 100644 --- a/sql/lock.cc +++ b/sql/lock.cc @@ -320,7 +320,8 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags) sql_lock->lock_count * sizeof(*sql_lock->locks)); #ifdef WITH_WSREP thd->lock_info.in_lock_tables= thd->in_lock_tables; -#endif /* Lock on the copied half of the lock data array. */ +#endif + /* Lock on the copied half of the lock data array. */ rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(sql_lock->locks + sql_lock->lock_count, @@ -330,6 +331,12 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags) (void) unlock_external(thd, sql_lock->table, sql_lock->table_count); end: +#ifdef WITH_WSREP + thd_proc_info(thd, "mysql_lock_tables(): unlocking tables II"); +#else /* WITH_WSREP */ + thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ + if (thd->killed) { thd->send_kill_message(); diff --git a/sql/log_event.cc b/sql/log_event.cc index 3d2a499a2b7..4f4d88df739 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4340,6 +4340,21 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); } } +#ifdef WITH_WSREP + else if (wsrep_mysql_replication_bundle && WSREP_ON && thd->wsrep_mysql_replicated > 0 && + (!strncasecmp(query , "BEGIN", 5) || !strncasecmp(query , "COMMIT", 6))) + { + if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) + { + WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); + DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE); + } + else + { + thd->wsrep_mysql_replicated = 0; + } + } +#endif DBUG_RETURN(Log_event::do_shall_skip(rli)); } @@ -7014,6 +7029,20 @@ Xid_log_event::do_shall_skip(Relay_log_info *rli) thd->variables.option_bits&= ~OPTION_BEGIN; DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); } +#ifdef WITH_WSREP + else if (wsrep_mysql_replication_bundle && WSREP_ON) + { + if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) + { + WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); + DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE); + } + else + { + thd->wsrep_mysql_replicated = 0; + } + } +#endif DBUG_RETURN(Log_event::do_shall_skip(rli)); } #endif /* !MYSQL_CLIENT */ @@ -8038,6 +8067,14 @@ err: end_io_cache(&file); if (fd >= 0) mysql_file_close(fd, MYF(0)); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit Create_file_log_event::do_apply_event()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ + thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ return error != 0; } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -8208,6 +8245,14 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli) err: if (fd >= 0) mysql_file_close(fd, MYF(0)); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit Append_block_log_event::do_apply_event()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ + thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ DBUG_RETURN(error); } #endif diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d2a3c028fd1..8a9e8db53fb 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -4464,6 +4464,26 @@ static int init_thread_environment() rpl_init_gtid_slave_state(); #endif +#ifdef WITH_WSREP + mysql_mutex_init(key_LOCK_wsrep_ready, + &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); + mysql_mutex_init(key_LOCK_wsrep_sst, + &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); + mysql_mutex_init(key_LOCK_wsrep_sst_init, + &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); + mysql_mutex_init(key_LOCK_wsrep_rollback, + &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); + mysql_mutex_init(key_LOCK_wsrep_replaying, + &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); + mysql_mutex_init(key_LOCK_wsrep_slave_threads, + &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); +#endif + DBUG_RETURN(0); } @@ -5076,6 +5096,504 @@ static void create_shutdown_thread() #endif /* EMBEDDED_LIBRARY */ +#ifdef WITH_WSREP +typedef void (*wsrep_thd_processor_fun)(THD *); + +pthread_handler_t start_wsrep_THD(void *arg) +{ + THD *thd; + wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; + + if (my_thread_init()) + { + WSREP_ERROR("Could not initialize thread"); + return(NULL); + } + + if (!(thd= new THD(true))) + { + return(NULL); + } + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id=thread_id++; + + thd->real_id=pthread_self(); // Keep purify happy + thread_count++; + thread_created++; + threads.append(thd); + + my_net_init(&thd->net,(st_vio*) 0, MYF(0)); + + DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); + thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); + (void) mysql_mutex_unlock(&LOCK_thread_count); + + /* from bootstrap()... */ + thd->bootstrap=1; + thd->max_client_packet_length= thd->net.max_packet; + thd->security_ctx->master_access= ~(ulong)0; + + /* from handle_one_connection... */ + pthread_detach_this_thread(); + + mysql_thread_set_psi_id(thd->thread_id); + thd->thr_create_utime= microsecond_interval_timer(); + if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) + { + close_connection(thd, ER_OUT_OF_RESOURCES, 1); + statistic_increment(aborted_connects,&LOCK_status); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); + + return(NULL); + } + +// </5.1.17> + /* + handle_one_connection() is normally the only way a thread would + start and would always be on the very high end of the stack , + therefore, the thread stack always starts at the address of the + first local variable of handle_one_connection, which is thd. We + need to know the start of the stack so that we could check for + stack overruns. + */ + DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n", + (long long)thd->thread_id)); + /* now that we've called my_thread_init(), it is safe to call DBUG_* */ + + thd->thread_stack= (char*) &thd; + if (thd->store_globals()) + { + close_connection(thd, ER_OUT_OF_RESOURCES, 1); + statistic_increment(aborted_connects,&LOCK_status); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); + delete thd; + + return(NULL); + } + + thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; + thd->security_ctx->skip_grants(); + + /* handle_one_connection() again... */ + //thd->version= refresh_version; + thd->proc_info= 0; + thd->set_command(COM_SLEEP); + thd->set_time(); + thd->init_for_queries(); + + mysql_mutex_lock(&LOCK_connection_count); + ++connection_count; + mysql_mutex_unlock(&LOCK_connection_count); + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_running_threads++; + mysql_cond_signal(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + processor(thd); + + close_connection(thd, 0, 1); + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_running_threads--; + mysql_cond_signal(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + // Note: We can't call THD destructor without crashing + // if plugins have not been initialized. However, in most of the + // cases this means that pre SE initialization SST failed and + // we are going to exit anyway. + if (plugins_are_initialized) + { + net_end(&thd->net); + MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1)); + } + else + { + // TODO: lightweight cleanup to get rid of: + // 'Error in my_thread_global_end(): 2 threads didn't exit' + // at server shutdown + } + + if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) + { + mysql_mutex_lock(&LOCK_thread_count); + delete thd; + thread_count--; + mysql_mutex_unlock(&LOCK_thread_count); + } + return(NULL); +} + +void wsrep_create_rollbacker() +{ + if (wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + pthread_t hThread; + /* create rollbacker */ + if (pthread_create( &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_rollback_process)) + WSREP_WARN("Can't create thread to manage wsrep rollback"); + } +} + +void wsrep_create_appliers(long threads) +{ + if (!wsrep_connected) + { + /* see wsrep_replication_start() for the logic */ + if (wsrep_cluster_address && strlen(wsrep_cluster_address) && + wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + WSREP_ERROR("Trying to launch slave threads before creating " + "connection at '%s'", wsrep_cluster_address); + assert(0); + } + return; + } + + long wsrep_threads=0; + pthread_t hThread; + while (wsrep_threads++ < threads) { + if (pthread_create( + &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_replication_process)) + WSREP_WARN("Can't create thread to manage wsrep replication"); + } +} +/**/ +static bool abort_replicated(THD *thd) +{ + bool ret_code= false; + if (thd->wsrep_query_state== QUERY_COMMITTING) + { + if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id); + + (void)wsrep_abort_thd(thd, thd, TRUE); + ret_code= true; + } + return ret_code; +} +/**/ +static inline bool is_client_connection(THD *thd) +{ +#if REMOVE +// REMOVE THIS LATER (lp:777201). Below we had to add an explicit check for +// wsrep_applier since wsrep_exec_mode didn't seem to always work +if (thd->wsrep_applier && thd->wsrep_exec_mode != REPL_RECV) +WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode); + + if ( thd->slave_thread || /* declared as mysql slave */ + thd->system_thread || /* declared as system thread */ + !thd->vio_ok() || /* server internal thread */ + thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */ + thd->wsrep_applier || /* wsrep slave applier */ + !thd->variables.wsrep_on) /* client, but fenced outside wsrep */ + return false; + + return true; +#else + return (thd->wsrep_client_thread && thd->variables.wsrep_on); +#endif /* REMOVE */ +} + +static inline bool is_replaying_connection(THD *thd) +{ + bool ret; + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + return ret; +} + +static inline bool is_committing_connection(THD *thd) +{ + bool ret; + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + return ret; +} + +static bool have_client_connections() +{ + THD *tmp; + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION) + { + (void)abort_replicated(tmp); + return true; + } + } + return false; +} + +/* + returns the number of wsrep appliers running. + However, the caller (thd parameter) is not taken in account + */ +static int have_wsrep_appliers(THD *thd) +{ + int ret= 0; + THD *tmp; + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + ret+= (tmp != thd && tmp->wsrep_applier); + } + return ret; +} + +static void wsrep_close_thread(THD *thd) +{ + thd->killed= KILL_CONNECTION; + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + if (thd->mysys_var) + { + thd->mysys_var->abort=1; + mysql_mutex_lock(&thd->mysys_var->mutex); + if (thd->mysys_var->current_cond) + { + mysql_mutex_lock(thd->mysys_var->current_mutex); + mysql_cond_broadcast(thd->mysys_var->current_cond); + mysql_mutex_unlock(thd->mysys_var->current_mutex); + } + mysql_mutex_unlock(&thd->mysys_var->mutex); + } +} + +static my_bool have_committing_connections() +{ + THD *tmp; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + if (!is_client_connection(tmp)) + continue; + + if (is_committing_connection(tmp)) + { + mysql_mutex_unlock(&LOCK_thread_count); + return TRUE; + } + } + mysql_mutex_unlock(&LOCK_thread_count); + return FALSE; +} + +int wsrep_wait_committing_connections_close(int wait_time) +{ + int sleep_time= 100; + + while (have_committing_connections() && wait_time > 0) + { + WSREP_DEBUG("wait for committing transaction to close: %d", wait_time); + my_sleep(sleep_time); + wait_time -= sleep_time; + } + if (have_committing_connections()) + { + return 1; + } + return 0; +} + +void wsrep_close_client_connections(my_bool wait_to_end) +{ + /* + First signal all threads that it's time to die + */ + + THD *tmp; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list + + bool kill_cached_threads_saved= kill_cached_threads; + kill_cached_threads= true; // prevent future threads caching + mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (!is_client_connection(tmp)) + continue; + + if (is_replaying_connection(tmp)) + { + tmp->killed= KILL_CONNECTION; + continue; + } + + /* replicated transactions must be skipped */ + if (abort_replicated(tmp)) + continue; + + WSREP_DEBUG("closing connection %ld", tmp->thread_id); + wsrep_close_thread(tmp); + } + mysql_mutex_unlock(&LOCK_thread_count); + + if (thread_count) + sleep(2); // Give threads time to die + + mysql_mutex_lock(&LOCK_thread_count); + /* + Force remaining threads to die by closing the connection to the client + */ + + I_List_iterator<THD> it2(threads); + while ((tmp=it2++)) + { +#ifndef __bsdi__ // Bug in BSDI kernel + if (is_client_connection(tmp) && + !abort_replicated(tmp) && + !is_replaying_connection(tmp)) + { + WSREP_INFO("killing local connection: %ld",tmp->thread_id); + close_connection(tmp,0,0); + } +#endif + } + + DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); + if (wsrep_debug) + WSREP_INFO("waiting for client connections to close: %u", thread_count); + + while (wait_to_end && have_client_connections()) + { + mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); + DBUG_PRINT("quit",("One thread died (count=%u)", thread_count)); + } + + kill_cached_threads= kill_cached_threads_saved; + + mysql_mutex_unlock(&LOCK_thread_count); + + /* All client connection threads have now been aborted */ +} + +void wsrep_close_applier(THD *thd) +{ + WSREP_DEBUG("closing applier %ld", thd->thread_id); + wsrep_close_thread(thd); +} + +static void wsrep_close_threads(THD *thd) +{ + THD *tmp; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list + + I_List_iterator<THD> it(threads); + while ((tmp=it++)) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (tmp->wsrep_applier && tmp != thd) + { + WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id); + wsrep_close_thread (tmp); + } + } + + mysql_mutex_unlock(&LOCK_thread_count); +} + +void wsrep_close_applier_threads(int count) +{ + THD *tmp; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list + + I_List_iterator<THD> it(threads); + while ((tmp=it++) && count) + { + DBUG_PRINT("quit",("Informing thread %ld that it's time to die", + tmp->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (tmp->wsrep_applier) + { + WSREP_DEBUG("closing wsrep applier thread %ld", tmp->thread_id); + tmp->wsrep_applier_closing= TRUE; + count--; + } + } + + mysql_mutex_unlock(&LOCK_thread_count); +} + +void wsrep_wait_appliers_close(THD *thd) +{ + /* Wait for wsrep appliers to gracefully exit */ + mysql_mutex_lock(&LOCK_thread_count); + while (have_wsrep_appliers(thd) > 1) + // 1 is for rollbacker thread which needs to be killed explicitly. + // This gotta be fixed in a more elegant manner if we gonna have arbitrary + // number of non-applier wsrep threads. + { + if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) + { + mysql_mutex_unlock(&LOCK_thread_count); + my_sleep(100); + mysql_mutex_lock(&LOCK_thread_count); + } + else + mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); + DBUG_PRINT("quit",("One applier died (count=%u)",thread_count)); + } + mysql_mutex_unlock(&LOCK_thread_count); + /* Now kill remaining wsrep threads: rollbacker */ + wsrep_close_threads (thd); + /* and wait for them to die */ + mysql_mutex_lock(&LOCK_thread_count); + while (have_wsrep_appliers(thd) > 0) + { + if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) + { + mysql_mutex_unlock(&LOCK_thread_count); + my_sleep(100); + mysql_mutex_lock(&LOCK_thread_count); + } + else + mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); + DBUG_PRINT("quit",("One thread died (count=%u)",thread_count)); + } + mysql_mutex_unlock(&LOCK_thread_count); + + /* All wsrep applier threads have now been aborted. However, if this thread + is also applier, we are still running... + */ +} + +void wsrep_kill_mysql(THD *thd) +{ + if (mysqld_server_started) + { + if (!shutdown_in_progress) + { + WSREP_INFO("starting shutdown"); + kill_mysql(); + } + } + else + { + unireg_abort(1); + } +} +#endif /* WITH_WSREP */ #if (defined(_WIN32) || defined(HAVE_SMEM)) && !defined(EMBEDDED_LIBRARY) static void handle_connections_methods() @@ -5431,6 +5949,15 @@ int mysqld_main(int argc, char **argv) my_str_malloc= &my_str_malloc_mysqld; my_str_free= &my_str_free_mysqld; +#ifdef WITH_WSREP /* WSREP AFTER SE */ + if (wsrep_recovery) + { + select_thread_in_use= 0; + wsrep_recover(); + unireg_abort(0); + } +#endif /* WITH_WSREP */ + /* init signals & alarm After this we can't quit by a simple unireg_abort @@ -8671,25 +9198,6 @@ mysqld_get_one_option(int optid, #endif break; } -#ifdef WITH_WSREP - mysql_mutex_init(key_LOCK_wsrep_ready, - &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); - mysql_mutex_init(key_LOCK_wsrep_sst, - &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); - mysql_mutex_init(key_LOCK_wsrep_sst_init, - &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); - mysql_mutex_init(key_LOCK_wsrep_rollback, - &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); - mysql_mutex_init(key_LOCK_wsrep_replaying, - &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); - mysql_mutex_init(key_LOCK_wsrep_slave_threads, - &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); -#endif return 0; } @@ -9276,487 +9784,6 @@ static void create_pid_file() } #endif /* EMBEDDED_LIBRARY */ -#ifdef WITH_WSREP -typedef void (*wsrep_thd_processor_fun)(THD *); - -pthread_handler_t start_wsrep_THD(void *arg) -{ - THD *thd; - wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; - - if (my_thread_init()) - { - WSREP_ERROR("Could not initialize thread"); - return(NULL); - } - - if (!(thd= new THD(true))) - { - return(NULL); - } - mysql_mutex_lock(&LOCK_thread_count); - thd->thread_id=thread_id++; - - thd->real_id=pthread_self(); // Keep purify happy - thread_count++; - thread_created++; - threads.append(thd); - - my_net_init(&thd->net,(st_vio*) 0, MYF(0)); - - DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); - thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); - (void) mysql_mutex_unlock(&LOCK_thread_count); - - /* from bootstrap()... */ - thd->bootstrap=1; - thd->max_client_packet_length= thd->net.max_packet; - thd->security_ctx->master_access= ~(ulong)0; - - /* from handle_one_connection... */ - pthread_detach_this_thread(); - - mysql_thread_set_psi_id(thd->thread_id); - thd->thr_create_utime= microsecond_interval_timer(); - if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0)) - { - close_connection(thd, ER_OUT_OF_RESOURCES, 1); - statistic_increment(aborted_connects,&LOCK_status); - MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); - - return(NULL); - } - -// </5.1.17> - /* - handle_one_connection() is normally the only way a thread would - start and would always be on the very high end of the stack , - therefore, the thread stack always starts at the address of the - first local variable of handle_one_connection, which is thd. We - need to know the start of the stack so that we could check for - stack overruns. - */ - DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n", - (long long)thd->thread_id)); - /* now that we've called my_thread_init(), it is safe to call DBUG_* */ - - thd->thread_stack= (char*) &thd; - if (thd->store_globals()) - { - close_connection(thd, ER_OUT_OF_RESOURCES, 1); - statistic_increment(aborted_connects,&LOCK_status); - MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); - delete thd; - - return(NULL); - } - - thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; - thd->security_ctx->skip_grants(); - - /* handle_one_connection() again... */ - //thd->version= refresh_version; - thd->proc_info= 0; - thd->set_command(COM_SLEEP); - thd->set_time(); - thd->init_for_queries(); - - mysql_mutex_lock(&LOCK_connection_count); - ++connection_count; - mysql_mutex_unlock(&LOCK_connection_count); - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_running_threads++; - mysql_cond_signal(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); - - processor(thd); - - close_connection(thd, 0, 1); - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_running_threads--; - mysql_cond_signal(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); - - // Note: We can't call THD destructor without crashing - // if plugins have not been initialized. However, in most of the - // cases this means that pre SE initialization SST failed and - // we are going to exit anyway. - if (plugins_are_initialized) - { - net_end(&thd->net); - MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1)); - } - else - { - // TODO: lightweight cleanup to get rid of: - // 'Error in my_thread_global_end(): 2 threads didn't exit' - // at server shutdown - } - - if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) - { - mysql_mutex_lock(&LOCK_thread_count); - delete thd; - thread_count--; - mysql_mutex_unlock(&LOCK_thread_count); - } - return(NULL); -} - -void wsrep_create_rollbacker() -{ - if (wsrep_provider && strcasecmp(wsrep_provider, "none")) - { - pthread_t hThread; - /* create rollbacker */ - if (pthread_create( &hThread, &connection_attrib, - start_wsrep_THD, (void*)wsrep_rollback_process)) - WSREP_WARN("Can't create thread to manage wsrep rollback"); - } -} - -void wsrep_create_appliers(long threads) -{ - if (!wsrep_connected) - { - /* see wsrep_replication_start() for the logic */ - if (wsrep_cluster_address && strlen(wsrep_cluster_address) && - wsrep_provider && strcasecmp(wsrep_provider, "none")) - { - WSREP_ERROR("Trying to launch slave threads before creating " - "connection at '%s'", wsrep_cluster_address); - assert(0); - } - return; - } - - long wsrep_threads=0; - pthread_t hThread; - while (wsrep_threads++ < threads) { - if (pthread_create( - &hThread, &connection_attrib, - start_wsrep_THD, (void*)wsrep_replication_process)) - WSREP_WARN("Can't create thread to manage wsrep replication"); - } -} -/**/ -static bool abort_replicated(THD *thd) -{ - bool ret_code= false; - if (thd->wsrep_query_state== QUERY_COMMITTING) - { - if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id); - - (void)wsrep_abort_thd(thd, thd, TRUE); - ret_code= true; - } - return ret_code; -} -/**/ -static inline bool is_client_connection(THD *thd) -{ - return (thd->wsrep_client_thread && thd->variables.wsrep_on); -} - -static inline bool is_replaying_connection(THD *thd) -{ - bool ret; - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - return ret; -} - -static inline bool is_committing_connection(THD *thd) -{ - bool ret; - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - return ret; -} - -static bool have_client_connections() -{ - THD *tmp; - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - DBUG_PRINT("quit",("Informing thread %ld that it's time to die", - tmp->thread_id)); - if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION) - { - (void)abort_replicated(tmp); - return true; - } - } - return false; -} - -/* - returns the number of wsrep appliers running. - However, the caller (thd parameter) is not taken in account - */ -static int have_wsrep_appliers(THD *thd) -{ - int ret= 0; - THD *tmp; - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - ret+= (tmp != thd && tmp->wsrep_applier); - } - return ret; -} - -static void wsrep_close_thread(THD *thd) -{ - thd->killed= KILL_CONNECTION; - MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); - if (thd->mysys_var) - { - thd->mysys_var->abort=1; - mysql_mutex_lock(&thd->mysys_var->mutex); - if (thd->mysys_var->current_cond) - { - mysql_mutex_lock(thd->mysys_var->current_mutex); - mysql_cond_broadcast(thd->mysys_var->current_cond); - mysql_mutex_unlock(thd->mysys_var->current_mutex); - } - mysql_mutex_unlock(&thd->mysys_var->mutex); - } -} - -static my_bool have_committing_connections() -{ - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - if (!is_client_connection(tmp)) - continue; - - if (is_committing_connection(tmp)) - { - mysql_mutex_unlock(&LOCK_thread_count); - return TRUE; - } - } - mysql_mutex_unlock(&LOCK_thread_count); - return FALSE; -} - -int wsrep_wait_committing_connections_close(int wait_time) -{ - int sleep_time= 100; - - while (have_committing_connections() && wait_time > 0) - { - WSREP_DEBUG("wait for committing transaction to close: %d", wait_time); - my_sleep(sleep_time); - wait_time -= sleep_time; - } - if (have_committing_connections()) - { - return 1; - } - return 0; -} - -void wsrep_close_client_connections(my_bool wait_to_end) -{ - /* - First signal all threads that it's time to die - */ - - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - bool kill_cached_threads_saved= kill_cached_threads; - kill_cached_threads= true; // prevent future threads caching - mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - DBUG_PRINT("quit",("Informing thread %ld that it's time to die", - tmp->thread_id)); - /* We skip slave threads & scheduler on this first loop through. */ - if (!is_client_connection(tmp)) - continue; - - if (is_replaying_connection(tmp)) - { - tmp->killed= KILL_CONNECTION; - continue; - } - - /* replicated transactions must be skipped */ - if (abort_replicated(tmp)) - continue; - - WSREP_DEBUG("closing connection %ld", tmp->thread_id); - wsrep_close_thread(tmp); - } - mysql_mutex_unlock(&LOCK_thread_count); - - if (thread_count) - sleep(2); // Give threads time to die - - mysql_mutex_lock(&LOCK_thread_count); - /* - Force remaining threads to die by closing the connection to the client - */ - - I_List_iterator<THD> it2(threads); - while ((tmp=it2++)) - { -#ifndef __bsdi__ // Bug in BSDI kernel - if (is_client_connection(tmp) && - !abort_replicated(tmp) && - !is_replaying_connection(tmp)) - { - WSREP_INFO("killing local connection: %ld",tmp->thread_id); - close_connection(tmp,0,0); - } -#endif - } - - DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); - if (wsrep_debug) - WSREP_INFO("waiting for client connections to close: %u", thread_count); - - while (wait_to_end && have_client_connections()) - { - mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); - DBUG_PRINT("quit",("One thread died (count=%u)", thread_count)); - } - - kill_cached_threads= kill_cached_threads_saved; - - mysql_mutex_unlock(&LOCK_thread_count); - - /* All client connection threads have now been aborted */ -} - -void wsrep_close_applier(THD *thd) -{ - WSREP_DEBUG("closing applier %ld", thd->thread_id); - wsrep_close_thread(thd); -} - -static void wsrep_close_threads(THD *thd) -{ - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - DBUG_PRINT("quit",("Informing thread %ld that it's time to die", - tmp->thread_id)); - /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->wsrep_applier && tmp != thd) - { - WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id); - wsrep_close_thread (tmp); - } - } - - mysql_mutex_unlock(&LOCK_thread_count); -} - -void wsrep_close_applier_threads(int count) -{ - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - I_List_iterator<THD> it(threads); - while ((tmp=it++) && count) - { - DBUG_PRINT("quit",("Informing thread %ld that it's time to die", - tmp->thread_id)); - /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->wsrep_applier) - { - WSREP_DEBUG("closing wsrep applier thread %ld", tmp->thread_id); - tmp->wsrep_applier_closing= TRUE; - count--; - } - } - - mysql_mutex_unlock(&LOCK_thread_count); -} - -void wsrep_wait_appliers_close(THD *thd) -{ - /* Wait for wsrep appliers to gracefully exit */ - mysql_mutex_lock(&LOCK_thread_count); - while (have_wsrep_appliers(thd) > 1) - // 1 is for rollbacker thread which needs to be killed explicitly. - // This gotta be fixed in a more elegant manner if we gonna have arbitrary - // number of non-applier wsrep threads. - { - if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) - { - mysql_mutex_unlock(&LOCK_thread_count); - my_sleep(100); - mysql_mutex_lock(&LOCK_thread_count); - } - else - mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); - DBUG_PRINT("quit",("One applier died (count=%u)",thread_count)); - } - mysql_mutex_unlock(&LOCK_thread_count); - /* Now kill remaining wsrep threads: rollbacker */ - wsrep_close_threads (thd); - /* and wait for them to die */ - mysql_mutex_lock(&LOCK_thread_count); - while (have_wsrep_appliers(thd) > 0) - { - if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION) - { - mysql_mutex_unlock(&LOCK_thread_count); - my_sleep(100); - mysql_mutex_lock(&LOCK_thread_count); - } - else - mysql_cond_wait(&COND_thread_count,&LOCK_thread_count); - DBUG_PRINT("quit",("One thread died (count=%u)",thread_count)); - } - mysql_mutex_unlock(&LOCK_thread_count); - - /* All wsrep applier threads have now been aborted. However, if this thread - is also applier, we are still running... - */ -} - -void wsrep_kill_mysql(THD *thd) -{ - if (mysqld_server_started) - { - if (!shutdown_in_progress) - { - WSREP_INFO("starting shutdown"); - kill_mysql(); - } - } - else - { - unireg_abort(1); - } -} -#endif /* WITH_WSREP */ /** Remove the process' pid file. diff --git a/sql/slave.cc b/sql/slave.cc index 1d87e16249a..8665cf646fc 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3118,23 +3118,6 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) ev->thd = thd; // because up to this point, ev->thd == 0 int reason= ev->shall_skip(rli); -#ifdef WITH_WSREP - if (WSREP_ON && (ev->get_type_code() == XID_EVENT || - (ev->get_type_code() == QUERY_EVENT && thd->wsrep_mysql_replicated > 0 && - (!strncasecmp(((Query_log_event*)ev)->query , "BEGIN", 5) || - !strncasecmp(((Query_log_event*)ev)->query , "COMMIT", 6) )))) - { - if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) - { - WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); - reason = Log_event::EVENT_SKIP_IGNORE; - } - else - { - thd->wsrep_mysql_replicated = 0; - } - } -#endif if (reason == Log_event::EVENT_SKIP_COUNT) { DBUG_ASSERT(rli->slave_skip_counter > 0); diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 1ededf0dd61..325c79043a3 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -4429,6 +4429,15 @@ restart: #endif err: +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit open_tables()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ + thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ + free_root(&new_frm_mem, MYF(0)); // Free pre-alloced block if (error && *table_to_open) @@ -4881,6 +4890,16 @@ end: trans_rollback_stmt(thd); close_thread_tables(thd); } + +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "End opening table"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ + thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ + DBUG_RETURN(table); } diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index d2bc3328643..ef139dfefeb 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -3187,6 +3187,12 @@ bool Delayed_insert::handle_inserts(void) mysql_cond_broadcast(&cond_client); // If waiting clients } } +#ifdef WITH_WSREP + if (WSREP((&thd))) + thd_proc_info(&thd, "insert done"); + else +#endif /* WITH_WSREP */ + thd_proc_info(&thd, 0); mysql_mutex_unlock(&mutex); /* diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 7239c99613c..675e82de145 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -970,9 +970,12 @@ bool do_command(THD *thd) thd->wsrep_query_state= QUERY_EXEC; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } -#endif /* WITH_WSREP */ + if ((WSREP(thd) && packet_length == packet_error) || + (!WSREP(thd) && (packet_length= my_net_read(net)) == packet_error)) +#else if (packet_length == packet_error) +#endif /* WITH_WSREP */ { DBUG_PRINT("info",("Got error %d reading command from socket %s", net->error, @@ -2708,9 +2711,6 @@ mysql_execute_command(THD *thd) my_error(ER_NOT_SUPPORTED_YET, MYF(0), "embedded server"); break; #endif -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; -#endif /* WITH_WSREP */ case SQLCOM_SHOW_STATUS: { execute_show_status(thd, all_tables); @@ -2745,6 +2745,10 @@ mysql_execute_command(THD *thd) } case SQLCOM_SHOW_STATUS_PROC: case SQLCOM_SHOW_STATUS_FUNC: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ + case SQLCOM_SHOW_DATABASES: case SQLCOM_SHOW_TABLES: case SQLCOM_SHOW_TRIGGERS: @@ -3237,12 +3241,6 @@ case SQLCOM_PREPARE: if (create_info.tmp_table()) thd->variables.option_bits|= OPTION_KEEP_LOG; /* regular create */ -#ifdef WITH_WSREP - if (!thd->is_current_stmt_binlog_format_row() || - !(create_info.options & HA_LEX_CREATE_TMP_TABLE)) - WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name, - NULL) -#endif /* WITH_WSREP */ if (create_info.options & HA_LEX_CREATE_TABLE_LIKE) { /* CREATE TABLE ... LIKE ... */ @@ -3251,6 +3249,13 @@ case SQLCOM_PREPARE: } else { +#ifdef WITH_WSREP + if (!thd->is_current_stmt_binlog_format_row() || + !(create_info.options & HA_LEX_CREATE_TMP_TABLE)) + WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name, + NULL) +#endif /* WITH_WSREP */ + /* Regular CREATE TABLE */ res= mysql_create_table(thd, create_table, &create_info, &alter_info); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 5a93f3b819a..412b941e946 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -2815,6 +2815,15 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) err: unlock_slave_threads(mi); +#ifdef WITH_WSREP + if (WSREP(thd)) + thd_proc_info(thd, "exit stop_slave()"); + else + thd_proc_info(thd, 0); +#else /* WITH_WSREP */ + thd_proc_info(thd, 0); +#endif /* WITH_WSREP */ + if (slave_errno) { if (net_report) diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 46f23838850..d63df895a13 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -59,6 +59,10 @@ #include "debug_sync.h" #include "keycaches.h" +#if !defined(MYSQL_MAX_VARIABLE_VALUE_LEN) +#define MYSQL_MAX_VARIABLE_VALUE_LEN 1024 +#endif // !defined(MYSQL_MAX_VARIABLE_VALUE_LEN) + #ifdef WITH_PARTITION_STORAGE_ENGINE #include "ha_partition.h" #endif @@ -8660,7 +8664,7 @@ ST_FIELD_INFO variables_fields_info[]= { {"VARIABLE_NAME", 64, MYSQL_TYPE_STRING, 0, 0, "Variable_name", SKIP_OPEN_TABLE}, - {"VARIABLE_VALUE", 1024, MYSQL_TYPE_STRING, 0, 1, + {"VARIABLE_VALUE", MYSQL_MAX_VARIABLE_VALUE_LEN, MYSQL_TYPE_STRING, 0, 1, "Value", SKIP_OPEN_TABLE}, {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, SKIP_OPEN_TABLE} }; |