summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysys/thr_lock.c257
-rw-r--r--scripts/mysqld_safe.sh68
-rw-r--r--sql/events.cc1
-rw-r--r--sql/handler.cc2
-rw-r--r--sql/lock.cc9
-rw-r--r--sql/log_event.cc45
-rw-r--r--sql/mysqld.cc1027
-rw-r--r--sql/slave.cc17
-rw-r--r--sql/sql_base.cc19
-rw-r--r--sql/sql_insert.cc6
-rw-r--r--sql/sql_parse.cc25
-rw-r--r--sql/sql_repl.cc9
-rw-r--r--sql/sql_show.cc6
13 files changed, 857 insertions, 634 deletions
diff --git a/mysys/thr_lock.c b/mysys/thr_lock.c
index 308e46a2f76..4f7c727594b 100644
--- a/mysys/thr_lock.c
+++ b/mysys/thr_lock.c
@@ -664,6 +664,108 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
DBUG_RETURN(result);
}
+#ifdef WITH_WSREP
+/*
+ * If brute force applier would need to wait for a thr lock,
+ * it needs to make sure that it will get the lock without (too much)
+ * delay.
+ * We identify here the owners of blocking locks and ask them to
+ * abort. We then put our lock request in the first place in the
+ * wait queue. When lock holders abort (one by one) the lock release
+ * algorithm should grant the lock to us. We rely on this and proceed
+ * to wait_for_locks().
+ * wsrep_break_locks() should be called in all the cases, where lock
+ * wait would happen.
+ *
+ * TODO: current implementation might not cover all possible lock wait
+ * situations. This needs an review still.
+ * TODO: lock release, might favor some other lock (instead our bf).
+ * This needs an condition to check for bf locks first.
+ * TODO: we still have a debug fprintf, this should be removed
+ */
+static inline my_bool
+wsrep_break_lock(
+ THR_LOCK_DATA *data, struct st_lock_list *lock_queue1,
+ struct st_lock_list *lock_queue2, struct st_lock_list *wait_queue)
+{
+ if (wsrep_on(data->owner->mysql_thd) &&
+ wsrep_thd_is_brute_force &&
+ wsrep_thd_is_brute_force(data->owner->mysql_thd))
+ {
+ THR_LOCK_DATA *holder;
+
+ /* if locking session conversion to transaction has been enabled,
+ we know that this conflicting lock must be read lock and furthermore,
+ lock holder is read-only. It is safe to wait for him.
+ */
+#ifdef TODO
+ if (wsrep_convert_LOCK_to_trx &&
+ (THD*)(data->owner->mysql_thd)->in_lock_tables)
+ {
+ if (wsrep_debug)
+ fprintf(stderr,"WSREP wsrep_break_lock read lock untouched\n");
+ return FALSE;
+ }
+#endif
+ if (wsrep_debug)
+ fprintf(stderr,"WSREP wsrep_break_lock aborting locks\n");
+
+ /* aborting lock holder(s) here */
+ for (holder=(lock_queue1) ? lock_queue1->data : NULL;
+ holder;
+ holder=holder->next)
+ {
+ if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd))
+ {
+ wsrep_abort_thd(data->owner->mysql_thd,
+ holder->owner->mysql_thd, FALSE);
+ }
+ else
+ {
+ if (wsrep_debug)
+ fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n");
+ return FALSE;
+ }
+ }
+ for (holder=(lock_queue2) ? lock_queue2->data : NULL;
+ holder;
+ holder=holder->next)
+ {
+ if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd))
+ {
+ wsrep_abort_thd(data->owner->mysql_thd,
+ holder->owner->mysql_thd, FALSE);
+ }
+ else
+ {
+ if (wsrep_debug)
+ fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n");
+ return FALSE;
+ }
+ }
+
+ /* Add our lock to the head of the wait queue */
+ if (*(wait_queue->last)==wait_queue->data)
+ {
+ wait_queue->last=&data->next;
+ assert(wait_queue->data==0);
+ }
+ else
+ {
+ assert(wait_queue->data!=0);
+ wait_queue->data->prev=&data->next;
+ }
+ data->next=wait_queue->data;
+ data->prev=&wait_queue->data;
+ wait_queue->data=data;
+ data->cond=get_cond();
+
+ statistic_increment(locks_immediate,&THR_LOCK_lock);
+ return TRUE;
+ }
+ return FALSE;
+}
+#endif
static enum enum_thr_lock_result
thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
@@ -672,6 +774,9 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
struct st_lock_list *wait_queue;
enum thr_lock_type lock_type= data->type;
+#ifdef WITH_WSREP
+ my_bool wsrep_lock_inserted= FALSE;
+#endif
MYSQL_TABLE_WAIT_VARIABLES(locker, state) /* no ';' */
DBUG_ENTER("thr_lock");
@@ -741,6 +846,14 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
}
if (lock->write.data->type == TL_WRITE_ONLY)
{
+#ifdef WITH_WSREP
+ if (wsrep_break_lock(data, &lock->write, NULL, &lock->read_wait))
+ {
+ wsrep_lock_inserted= TRUE;
+ goto wsrep_read_wait;
+ }
+#endif
+
/* We are not allowed to get a READ lock in this case */
data->type=TL_UNLOCK;
result= THR_LOCK_ABORTED; /* Can't wait for this one */
@@ -768,6 +881,14 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
lock but a high priority write waiting in the write_wait queue.
In the latter case we should yield the lock to the writer.
*/
+#ifdef WITH_WSREP
+ if (wsrep_break_lock(data, &lock->write, NULL, &lock->read_wait))
+ {
+ wsrep_lock_inserted= TRUE;
+ }
+ wsrep_read_wait:
+#endif
+
wait_queue= &lock->read_wait;
}
else /* Request for WRITE lock */
@@ -776,12 +897,25 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
{
if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY)
{
+#ifdef WITH_WSREP
+ if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait))
+ {
+ wsrep_lock_inserted=TRUE;
+ goto wsrep_write_wait;
+ }
+#endif
data->type=TL_UNLOCK;
result= THR_LOCK_ABORTED; /* Can't wait for this one */
goto end;
}
if (lock->write.data || lock->read.data)
{
+#ifdef WITH_WSREP
+ if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait))
+ {
+ goto end;
+ }
+#endif
/* Add delayed write lock to write_wait queue, and return at once */
(*lock->write_wait.last)=data;
data->prev=lock->write_wait.last;
@@ -806,6 +940,13 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
/* Allow lock owner to bypass TL_WRITE_ONLY. */
if (!thr_lock_owner_equal(data->owner, lock->write.data->owner))
{
+#ifdef WITH_WSREP
+ if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait))
+ {
+ wsrep_lock_inserted=TRUE;
+ goto wsrep_write_wait;
+ }
+#endif
/* We are not allowed to get a lock in this case */
data->type=TL_UNLOCK;
result= THR_LOCK_ABORTED; /* Can't wait for this one */
@@ -909,9 +1050,22 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
DBUG_PRINT("lock",("write locked 3 by thread: 0x%lx type: %d",
lock->read.data->owner->thread_id, data->type));
}
+#ifdef WITH_WSREP
+ if (wsrep_break_lock(data, &lock->write, NULL, &lock->write_wait))
+ {
+ wsrep_lock_inserted= TRUE;
+ }
+ wsrep_write_wait:
+
+#endif
+
wait_queue= &lock->write_wait;
}
/* Can't get lock yet; Wait for it */
+#ifdef WITH_WSREP
+ if (wsrep_on(data->owner->mysql_thd) && wsrep_lock_inserted)
+ DBUG_RETURN(wait_for_lock(wait_queue, data, 1, lock_wait_timeout));
+#endif
result= wait_for_lock(wait_queue, data, 0, lock_wait_timeout);
MYSQL_END_TABLE_LOCK_WAIT(locker);
DBUG_RETURN(result);
@@ -1174,109 +1328,6 @@ static void sort_locks(THR_LOCK_DATA **data,uint count)
}
}
-#ifdef WITH_WSREP
-/*
- * If brute force applier would need to wait for a thr lock,
- * it needs to make sure that it will get the lock without (too much)
- * delay.
- * We identify here the owners of blocking locks and ask them to
- * abort. We then put our lock request in the first place in the
- * wait queue. When lock holders abort (one by one) the lock release
- * algorithm should grant the lock to us. We rely on this and proceed
- * to wait_for_locks().
- * wsrep_break_locks() should be called in all the cases, where lock
- * wait would happen.
- *
- * TODO: current implementation might not cover all possible lock wait
- * situations. This needs an review still.
- * TODO: lock release, might favor some other lock (instead our bf).
- * This needs an condition to check for bf locks first.
- * TODO: we still have a debug fprintf, this should be removed
- */
-static inline my_bool
-wsrep_break_lock(
- THR_LOCK_DATA *data, struct st_lock_list *lock_queue1,
- struct st_lock_list *lock_queue2, struct st_lock_list *wait_queue)
-{
- if (wsrep_on(data->owner->mysql_thd) &&
- wsrep_thd_is_brute_force &&
- wsrep_thd_is_brute_force(data->owner->mysql_thd))
- {
- THR_LOCK_DATA *holder;
-
- /* if locking session conversion to transaction has been enabled,
- we know that this conflicting lock must be read lock and furthermore,
- lock holder is read-only. It is safe to wait for him.
- */
-#ifdef TODO
- if (wsrep_convert_LOCK_to_trx &&
- (THD*)(data->owner->mysql_thd)->in_lock_tables)
- {
- if (wsrep_debug)
- fprintf(stderr,"WSREP wsrep_break_lock read lock untouched\n");
- return FALSE;
- }
-#endif
- if (wsrep_debug)
- fprintf(stderr,"WSREP wsrep_break_lock aborting locks\n");
-
- /* aborting lock holder(s) here */
- for (holder=(lock_queue1) ? lock_queue1->data : NULL;
- holder;
- holder=holder->next)
- {
- if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd))
- {
- wsrep_abort_thd(data->owner->mysql_thd,
- holder->owner->mysql_thd, FALSE);
- }
- else
- {
- if (wsrep_debug)
- fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n");
- return FALSE;
- }
- }
- for (holder=(lock_queue2) ? lock_queue2->data : NULL;
- holder;
- holder=holder->next)
- {
- if (!wsrep_thd_is_brute_force(holder->owner->mysql_thd))
- {
- wsrep_abort_thd(data->owner->mysql_thd,
- holder->owner->mysql_thd, FALSE);
- }
- else
- {
- if (wsrep_debug)
- fprintf(stderr,"WSREP wsrep_break_lock skipping BF lock conflict\n");
- return FALSE;
- }
- }
-
- /* Add our lock to the head of the wait queue */
- if (*(wait_queue->last)==wait_queue->data)
- {
- wait_queue->last=&data->next;
- assert(wait_queue->data==0);
- }
- else
- {
- assert(wait_queue->data!=0);
- wait_queue->data->prev=&data->next;
- }
- data->next=wait_queue->data;
- data->prev=&wait_queue->data;
- wait_queue->data=data;
- data->cond=get_cond();
-
- statistic_increment(locks_immediate,&THR_LOCK_lock);
- return TRUE;
- }
- return FALSE;
-}
-#endif
-
enum enum_thr_lock_result
thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_INFO *owner,
ulong lock_wait_timeout)
diff --git a/scripts/mysqld_safe.sh b/scripts/mysqld_safe.sh
index 7db2a5605e1..113c3236fab 100644
--- a/scripts/mysqld_safe.sh
+++ b/scripts/mysqld_safe.sh
@@ -18,6 +18,8 @@ niceness=0
nowatch=0
mysqld_ld_preload=
mysqld_ld_library_path=
+flush_caches=0
+numa_interleave=0
# Initial logging status: error log is not open, and not using syslog
logging=init
@@ -85,6 +87,10 @@ Usage: $0 [OPTIONS]
--syslog Log messages to syslog with 'logger'
--skip-syslog Log messages to error log (default)
--syslog-tag=TAG Pass -t "mysqld-TAG" to 'logger'
+ --flush-caches Flush and purge buffers/caches before
+ starting the server
+ --numa-interleave Run mysqld with its memory interleaved
+ on all NUMA nodes
All other options are passed to the mysqld program.
@@ -312,6 +318,8 @@ parse_arguments() {
--syslog-tag=*) syslog_tag="$val" ;;
--timezone=*) TZ="$val"; export TZ; ;;
--wsrep[-_]urls=*) wsrep_urls="$val"; ;;
+ --flush-caches) flush_caches=1 ;;
+ --numa-interleave) numa_interleave=1 ;;
--wsrep[-_]provider=*)
if test -n "$val" && test "$val" != "none"
then
@@ -834,6 +842,40 @@ mysqld daemon not started"
fi
fi
+# Flush and purge buffers/caches.
+#
+
+if @TARGET_LINUX@ && test $flush_caches -eq 1
+then
+ # Locate sync, ensure it exists.
+ if ! my_which sync > /dev/null 2>&1
+ then
+ log_error "sync command not found, required for --flush-caches"
+ exit 1
+ # Flush file system buffers.
+ elif ! sync
+ then
+ # Huh, the sync() function is always successful...
+ log_error "sync failed, check if sync is properly installed"
+ fi
+
+ # Locate sysctl, ensure it exists.
+ if ! my_which sysctl > /dev/null 2>&1
+ then
+ log_error "sysctl command not found, required for --flush-caches"
+ exit 1
+ # Purge page cache, dentries and inodes.
+ elif ! sysctl -q -w vm.drop_caches=3
+ then
+ log_error "sysctl failed, check the error message for details"
+ exit 1
+ fi
+elif test $flush_caches -eq 1
+then
+ log_error "--flush-caches is not supported on this platform"
+ exit 1
+fi
+
#
# Uncomment the following lines if you want all tables to be automatically
# checked and repaired during startup. You should add sensible key_buffer
@@ -854,6 +896,32 @@ fi
cmd="`mysqld_ld_preload_text`$NOHUP_NICENESS"
+#
+# Set mysqld's memory interleave policy.
+#
+
+if @TARGET_LINUX@ && test $numa_interleave -eq 1
+then
+ # Locate numactl, ensure it exists.
+ if ! my_which numactl > /dev/null 2>&1
+ then
+ log_error "numactl command not found, required for --numa-interleave"
+ exit 1
+ # Attempt to run a command, ensure it works.
+ elif ! numactl --interleave=all true
+ then
+ log_error "numactl failed, check if numactl is properly installed"
+ fi
+
+ # Launch mysqld with numactl.
+ cmd="$cmd numactl --interleave=all"
+elif test $numa_interleave -eq 1
+then
+ log_error "--numa-interleave is not supported on this platform"
+ exit 1
+fi
+
+
for i in "$ledir/$MYSQLD" "$defaults" "--basedir=$MY_BASEDIR_VERSION" \
"--datadir=$DATADIR" "--plugin-dir=$plugin_dir" "$USER_OPTION"
do
diff --git a/sql/events.cc b/sql/events.cc
index 96c4effe3d9..7df8d19644d 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -1167,6 +1167,7 @@ end:
close_mysql_tables(thd);
DBUG_RETURN(ret);
}
+
#ifdef WITH_WSREP
int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len)
{
diff --git a/sql/handler.cc b/sql/handler.cc
index e20afb5798f..dd297740c7d 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1387,7 +1387,6 @@ int ha_commit_trans(THD *thd, bool all)
err= ht->prepare(ht, thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
- {
#ifdef WITH_WSREP
if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP)
{
@@ -1402,7 +1401,6 @@ int ha_commit_trans(THD *thd, bool all)
/* not wsrep hton, bail to native mysql behavior */
#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
- }
if (err)
goto err;
diff --git a/sql/lock.cc b/sql/lock.cc
index 99a53ebfe3b..62540bcb55a 100644
--- a/sql/lock.cc
+++ b/sql/lock.cc
@@ -320,7 +320,8 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags)
sql_lock->lock_count * sizeof(*sql_lock->locks));
#ifdef WITH_WSREP
thd->lock_info.in_lock_tables= thd->in_lock_tables;
-#endif /* Lock on the copied half of the lock data array. */
+#endif
+
/* Lock on the copied half of the lock data array. */
rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(sql_lock->locks +
sql_lock->lock_count,
@@ -330,6 +331,12 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags)
(void) unlock_external(thd, sql_lock->table, sql_lock->table_count);
end:
+#ifdef WITH_WSREP
+ thd_proc_info(thd, "mysql_lock_tables(): unlocking tables II");
+#else /* WITH_WSREP */
+ thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
+
if (thd->killed)
{
thd->send_kill_message();
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 3d2a499a2b7..4f4d88df739 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -4340,6 +4340,21 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
+#ifdef WITH_WSREP
+ else if (wsrep_mysql_replication_bundle && WSREP_ON && thd->wsrep_mysql_replicated > 0 &&
+ (!strncasecmp(query , "BEGIN", 5) || !strncasecmp(query , "COMMIT", 6)))
+ {
+ if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
+ {
+ WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
+ DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
+ }
+ else
+ {
+ thd->wsrep_mysql_replicated = 0;
+ }
+ }
+#endif
DBUG_RETURN(Log_event::do_shall_skip(rli));
}
@@ -7014,6 +7029,20 @@ Xid_log_event::do_shall_skip(Relay_log_info *rli)
thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
+#ifdef WITH_WSREP
+ else if (wsrep_mysql_replication_bundle && WSREP_ON)
+ {
+ if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
+ {
+ WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
+ DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
+ }
+ else
+ {
+ thd->wsrep_mysql_replicated = 0;
+ }
+ }
+#endif
DBUG_RETURN(Log_event::do_shall_skip(rli));
}
#endif /* !MYSQL_CLIENT */
@@ -8038,6 +8067,14 @@ err:
end_io_cache(&file);
if (fd >= 0)
mysql_file_close(fd, MYF(0));
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit Create_file_log_event::do_apply_event()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
+ thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
return error != 0;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -8208,6 +8245,14 @@ int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
err:
if (fd >= 0)
mysql_file_close(fd, MYF(0));
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit Append_block_log_event::do_apply_event()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
+ thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
#endif
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d2a3c028fd1..8a9e8db53fb 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -4464,6 +4464,26 @@ static int init_thread_environment()
rpl_init_gtid_slave_state();
#endif
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_ready,
+ &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_sst,
+ &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_sst_init,
+ &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_rollback,
+ &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_replaying,
+ &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_slave_threads,
+ &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
+#endif
+
DBUG_RETURN(0);
}
@@ -5076,6 +5096,504 @@ static void create_shutdown_thread()
#endif /* EMBEDDED_LIBRARY */
+#ifdef WITH_WSREP
+typedef void (*wsrep_thd_processor_fun)(THD *);
+
+pthread_handler_t start_wsrep_THD(void *arg)
+{
+ THD *thd;
+ wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
+
+ if (my_thread_init())
+ {
+ WSREP_ERROR("Could not initialize thread");
+ return(NULL);
+ }
+
+ if (!(thd= new THD(true)))
+ {
+ return(NULL);
+ }
+ mysql_mutex_lock(&LOCK_thread_count);
+ thd->thread_id=thread_id++;
+
+ thd->real_id=pthread_self(); // Keep purify happy
+ thread_count++;
+ thread_created++;
+ threads.append(thd);
+
+ my_net_init(&thd->net,(st_vio*) 0, MYF(0));
+
+ DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
+ thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
+ (void) mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* from bootstrap()... */
+ thd->bootstrap=1;
+ thd->max_client_packet_length= thd->net.max_packet;
+ thd->security_ctx->master_access= ~(ulong)0;
+
+ /* from handle_one_connection... */
+ pthread_detach_this_thread();
+
+ mysql_thread_set_psi_id(thd->thread_id);
+ thd->thr_create_utime= microsecond_interval_timer();
+ if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+
+ return(NULL);
+ }
+
+// </5.1.17>
+ /*
+ handle_one_connection() is normally the only way a thread would
+ start and would always be on the very high end of the stack ,
+ therefore, the thread stack always starts at the address of the
+ first local variable of handle_one_connection, which is thd. We
+ need to know the start of the stack so that we could check for
+ stack overruns.
+ */
+ DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
+ (long long)thd->thread_id));
+ /* now that we've called my_thread_init(), it is safe to call DBUG_* */
+
+ thd->thread_stack= (char*) &thd;
+ if (thd->store_globals())
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+ delete thd;
+
+ return(NULL);
+ }
+
+ thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
+ thd->security_ctx->skip_grants();
+
+ /* handle_one_connection() again... */
+ //thd->version= refresh_version;
+ thd->proc_info= 0;
+ thd->set_command(COM_SLEEP);
+ thd->set_time();
+ thd->init_for_queries();
+
+ mysql_mutex_lock(&LOCK_connection_count);
+ ++connection_count;
+ mysql_mutex_unlock(&LOCK_connection_count);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads++;
+ mysql_cond_signal(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ processor(thd);
+
+ close_connection(thd, 0, 1);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads--;
+ mysql_cond_signal(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ // Note: We can't call THD destructor without crashing
+ // if plugins have not been initialized. However, in most of the
+ // cases this means that pre SE initialization SST failed and
+ // we are going to exit anyway.
+ if (plugins_are_initialized)
+ {
+ net_end(&thd->net);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
+ }
+ else
+ {
+ // TODO: lightweight cleanup to get rid of:
+ // 'Error in my_thread_global_end(): 2 threads didn't exit'
+ // at server shutdown
+ }
+
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ thread_count--;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ return(NULL);
+}
+
+void wsrep_create_rollbacker()
+{
+ if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ pthread_t hThread;
+ /* create rollbacker */
+ if (pthread_create( &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_rollback_process))
+ WSREP_WARN("Can't create thread to manage wsrep rollback");
+ }
+}
+
+void wsrep_create_appliers(long threads)
+{
+ if (!wsrep_connected)
+ {
+ /* see wsrep_replication_start() for the logic */
+ if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
+ wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ WSREP_ERROR("Trying to launch slave threads before creating "
+ "connection at '%s'", wsrep_cluster_address);
+ assert(0);
+ }
+ return;
+ }
+
+ long wsrep_threads=0;
+ pthread_t hThread;
+ while (wsrep_threads++ < threads) {
+ if (pthread_create(
+ &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_replication_process))
+ WSREP_WARN("Can't create thread to manage wsrep replication");
+ }
+}
+/**/
+static bool abort_replicated(THD *thd)
+{
+ bool ret_code= false;
+ if (thd->wsrep_query_state== QUERY_COMMITTING)
+ {
+ if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id);
+
+ (void)wsrep_abort_thd(thd, thd, TRUE);
+ ret_code= true;
+ }
+ return ret_code;
+}
+/**/
+static inline bool is_client_connection(THD *thd)
+{
+#if REMOVE
+// REMOVE THIS LATER (lp:777201). Below we had to add an explicit check for
+// wsrep_applier since wsrep_exec_mode didn't seem to always work
+if (thd->wsrep_applier && thd->wsrep_exec_mode != REPL_RECV)
+WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode);
+
+ if ( thd->slave_thread || /* declared as mysql slave */
+ thd->system_thread || /* declared as system thread */
+ !thd->vio_ok() || /* server internal thread */
+ thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */
+ thd->wsrep_applier || /* wsrep slave applier */
+ !thd->variables.wsrep_on) /* client, but fenced outside wsrep */
+ return false;
+
+ return true;
+#else
+ return (thd->wsrep_client_thread && thd->variables.wsrep_on);
+#endif /* REMOVE */
+}
+
+static inline bool is_replaying_connection(THD *thd)
+{
+ bool ret;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ return ret;
+}
+
+static inline bool is_committing_connection(THD *thd)
+{
+ bool ret;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ return ret;
+}
+
+static bool have_client_connections()
+{
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
+ {
+ (void)abort_replicated(tmp);
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ returns the number of wsrep appliers running.
+ However, the caller (thd parameter) is not taken in account
+ */
+static int have_wsrep_appliers(THD *thd)
+{
+ int ret= 0;
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ ret+= (tmp != thd && tmp->wsrep_applier);
+ }
+ return ret;
+}
+
+static void wsrep_close_thread(THD *thd)
+{
+ thd->killed= KILL_CONNECTION;
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ if (thd->mysys_var)
+ {
+ thd->mysys_var->abort=1;
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ if (thd->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(thd->mysys_var->current_mutex);
+ mysql_cond_broadcast(thd->mysys_var->current_cond);
+ mysql_mutex_unlock(thd->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+ }
+}
+
+static my_bool have_committing_connections()
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ if (!is_client_connection(tmp))
+ continue;
+
+ if (is_committing_connection(tmp))
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ return TRUE;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ return FALSE;
+}
+
+int wsrep_wait_committing_connections_close(int wait_time)
+{
+ int sleep_time= 100;
+
+ while (have_committing_connections() && wait_time > 0)
+ {
+ WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
+ my_sleep(sleep_time);
+ wait_time -= sleep_time;
+ }
+ if (have_committing_connections())
+ {
+ return 1;
+ }
+ return 0;
+}
+
+void wsrep_close_client_connections(my_bool wait_to_end)
+{
+ /*
+ First signal all threads that it's time to die
+ */
+
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ bool kill_cached_threads_saved= kill_cached_threads;
+ kill_cached_threads= true; // prevent future threads caching
+ mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (!is_client_connection(tmp))
+ continue;
+
+ if (is_replaying_connection(tmp))
+ {
+ tmp->killed= KILL_CONNECTION;
+ continue;
+ }
+
+ /* replicated transactions must be skipped */
+ if (abort_replicated(tmp))
+ continue;
+
+ WSREP_DEBUG("closing connection %ld", tmp->thread_id);
+ wsrep_close_thread(tmp);
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (thread_count)
+ sleep(2); // Give threads time to die
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ /*
+ Force remaining threads to die by closing the connection to the client
+ */
+
+ I_List_iterator<THD> it2(threads);
+ while ((tmp=it2++))
+ {
+#ifndef __bsdi__ // Bug in BSDI kernel
+ if (is_client_connection(tmp) &&
+ !abort_replicated(tmp) &&
+ !is_replaying_connection(tmp))
+ {
+ WSREP_INFO("killing local connection: %ld",tmp->thread_id);
+ close_connection(tmp,0,0);
+ }
+#endif
+ }
+
+ DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
+ if (wsrep_debug)
+ WSREP_INFO("waiting for client connections to close: %u", thread_count);
+
+ while (wait_to_end && have_client_connections())
+ {
+ mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
+ }
+
+ kill_cached_threads= kill_cached_threads_saved;
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All client connection threads have now been aborted */
+}
+
+void wsrep_close_applier(THD *thd)
+{
+ WSREP_DEBUG("closing applier %ld", thd->thread_id);
+ wsrep_close_thread(thd);
+}
+
+static void wsrep_close_threads(THD *thd)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier && tmp != thd)
+ {
+ WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id);
+ wsrep_close_thread (tmp);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+void wsrep_close_applier_threads(int count)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++) && count)
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier)
+ {
+ WSREP_DEBUG("closing wsrep applier thread %ld", tmp->thread_id);
+ tmp->wsrep_applier_closing= TRUE;
+ count--;
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+void wsrep_wait_appliers_close(THD *thd)
+{
+ /* Wait for wsrep appliers to gracefully exit */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 1)
+ // 1 is for rollbacker thread which needs to be killed explicitly.
+ // This gotta be fixed in a more elegant manner if we gonna have arbitrary
+ // number of non-applier wsrep threads.
+ {
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ my_sleep(100);
+ mysql_mutex_lock(&LOCK_thread_count);
+ }
+ else
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ /* Now kill remaining wsrep threads: rollbacker */
+ wsrep_close_threads (thd);
+ /* and wait for them to die */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 0)
+ {
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ my_sleep(100);
+ mysql_mutex_lock(&LOCK_thread_count);
+ }
+ else
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All wsrep applier threads have now been aborted. However, if this thread
+ is also applier, we are still running...
+ */
+}
+
+void wsrep_kill_mysql(THD *thd)
+{
+ if (mysqld_server_started)
+ {
+ if (!shutdown_in_progress)
+ {
+ WSREP_INFO("starting shutdown");
+ kill_mysql();
+ }
+ }
+ else
+ {
+ unireg_abort(1);
+ }
+}
+#endif /* WITH_WSREP */
#if (defined(_WIN32) || defined(HAVE_SMEM)) && !defined(EMBEDDED_LIBRARY)
static void handle_connections_methods()
@@ -5431,6 +5949,15 @@ int mysqld_main(int argc, char **argv)
my_str_malloc= &my_str_malloc_mysqld;
my_str_free= &my_str_free_mysqld;
+#ifdef WITH_WSREP /* WSREP AFTER SE */
+ if (wsrep_recovery)
+ {
+ select_thread_in_use= 0;
+ wsrep_recover();
+ unireg_abort(0);
+ }
+#endif /* WITH_WSREP */
+
/*
init signals & alarm
After this we can't quit by a simple unireg_abort
@@ -8671,25 +9198,6 @@ mysqld_get_one_option(int optid,
#endif
break;
}
-#ifdef WITH_WSREP
- mysql_mutex_init(key_LOCK_wsrep_ready,
- &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
- mysql_mutex_init(key_LOCK_wsrep_sst,
- &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
- mysql_mutex_init(key_LOCK_wsrep_sst_init,
- &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
- mysql_mutex_init(key_LOCK_wsrep_rollback,
- &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
- mysql_mutex_init(key_LOCK_wsrep_replaying,
- &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
- mysql_mutex_init(key_LOCK_wsrep_slave_threads,
- &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
-#endif
return 0;
}
@@ -9276,487 +9784,6 @@ static void create_pid_file()
}
#endif /* EMBEDDED_LIBRARY */
-#ifdef WITH_WSREP
-typedef void (*wsrep_thd_processor_fun)(THD *);
-
-pthread_handler_t start_wsrep_THD(void *arg)
-{
- THD *thd;
- wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
-
- if (my_thread_init())
- {
- WSREP_ERROR("Could not initialize thread");
- return(NULL);
- }
-
- if (!(thd= new THD(true)))
- {
- return(NULL);
- }
- mysql_mutex_lock(&LOCK_thread_count);
- thd->thread_id=thread_id++;
-
- thd->real_id=pthread_self(); // Keep purify happy
- thread_count++;
- thread_created++;
- threads.append(thd);
-
- my_net_init(&thd->net,(st_vio*) 0, MYF(0));
-
- DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
- thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
- (void) mysql_mutex_unlock(&LOCK_thread_count);
-
- /* from bootstrap()... */
- thd->bootstrap=1;
- thd->max_client_packet_length= thd->net.max_packet;
- thd->security_ctx->master_access= ~(ulong)0;
-
- /* from handle_one_connection... */
- pthread_detach_this_thread();
-
- mysql_thread_set_psi_id(thd->thread_id);
- thd->thr_create_utime= microsecond_interval_timer();
- if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
- {
- close_connection(thd, ER_OUT_OF_RESOURCES, 1);
- statistic_increment(aborted_connects,&LOCK_status);
- MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
-
- return(NULL);
- }
-
-// </5.1.17>
- /*
- handle_one_connection() is normally the only way a thread would
- start and would always be on the very high end of the stack ,
- therefore, the thread stack always starts at the address of the
- first local variable of handle_one_connection, which is thd. We
- need to know the start of the stack so that we could check for
- stack overruns.
- */
- DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
- (long long)thd->thread_id));
- /* now that we've called my_thread_init(), it is safe to call DBUG_* */
-
- thd->thread_stack= (char*) &thd;
- if (thd->store_globals())
- {
- close_connection(thd, ER_OUT_OF_RESOURCES, 1);
- statistic_increment(aborted_connects,&LOCK_status);
- MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
- delete thd;
-
- return(NULL);
- }
-
- thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
- thd->security_ctx->skip_grants();
-
- /* handle_one_connection() again... */
- //thd->version= refresh_version;
- thd->proc_info= 0;
- thd->set_command(COM_SLEEP);
- thd->set_time();
- thd->init_for_queries();
-
- mysql_mutex_lock(&LOCK_connection_count);
- ++connection_count;
- mysql_mutex_unlock(&LOCK_connection_count);
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_running_threads++;
- mysql_cond_signal(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-
- processor(thd);
-
- close_connection(thd, 0, 1);
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_running_threads--;
- mysql_cond_signal(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-
- // Note: We can't call THD destructor without crashing
- // if plugins have not been initialized. However, in most of the
- // cases this means that pre SE initialization SST failed and
- // we are going to exit anyway.
- if (plugins_are_initialized)
- {
- net_end(&thd->net);
- MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
- }
- else
- {
- // TODO: lightweight cleanup to get rid of:
- // 'Error in my_thread_global_end(): 2 threads didn't exit'
- // at server shutdown
- }
-
- if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
- {
- mysql_mutex_lock(&LOCK_thread_count);
- delete thd;
- thread_count--;
- mysql_mutex_unlock(&LOCK_thread_count);
- }
- return(NULL);
-}
-
-void wsrep_create_rollbacker()
-{
- if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
- {
- pthread_t hThread;
- /* create rollbacker */
- if (pthread_create( &hThread, &connection_attrib,
- start_wsrep_THD, (void*)wsrep_rollback_process))
- WSREP_WARN("Can't create thread to manage wsrep rollback");
- }
-}
-
-void wsrep_create_appliers(long threads)
-{
- if (!wsrep_connected)
- {
- /* see wsrep_replication_start() for the logic */
- if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
- wsrep_provider && strcasecmp(wsrep_provider, "none"))
- {
- WSREP_ERROR("Trying to launch slave threads before creating "
- "connection at '%s'", wsrep_cluster_address);
- assert(0);
- }
- return;
- }
-
- long wsrep_threads=0;
- pthread_t hThread;
- while (wsrep_threads++ < threads) {
- if (pthread_create(
- &hThread, &connection_attrib,
- start_wsrep_THD, (void*)wsrep_replication_process))
- WSREP_WARN("Can't create thread to manage wsrep replication");
- }
-}
-/**/
-static bool abort_replicated(THD *thd)
-{
- bool ret_code= false;
- if (thd->wsrep_query_state== QUERY_COMMITTING)
- {
- if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id);
-
- (void)wsrep_abort_thd(thd, thd, TRUE);
- ret_code= true;
- }
- return ret_code;
-}
-/**/
-static inline bool is_client_connection(THD *thd)
-{
- return (thd->wsrep_client_thread && thd->variables.wsrep_on);
-}
-
-static inline bool is_replaying_connection(THD *thd)
-{
- bool ret;
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- return ret;
-}
-
-static inline bool is_committing_connection(THD *thd)
-{
- bool ret;
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- return ret;
-}
-
-static bool have_client_connections()
-{
- THD *tmp;
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
- tmp->thread_id));
- if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
- {
- (void)abort_replicated(tmp);
- return true;
- }
- }
- return false;
-}
-
-/*
- returns the number of wsrep appliers running.
- However, the caller (thd parameter) is not taken in account
- */
-static int have_wsrep_appliers(THD *thd)
-{
- int ret= 0;
- THD *tmp;
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- ret+= (tmp != thd && tmp->wsrep_applier);
- }
- return ret;
-}
-
-static void wsrep_close_thread(THD *thd)
-{
- thd->killed= KILL_CONNECTION;
- MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
- if (thd->mysys_var)
- {
- thd->mysys_var->abort=1;
- mysql_mutex_lock(&thd->mysys_var->mutex);
- if (thd->mysys_var->current_cond)
- {
- mysql_mutex_lock(thd->mysys_var->current_mutex);
- mysql_cond_broadcast(thd->mysys_var->current_cond);
- mysql_mutex_unlock(thd->mysys_var->current_mutex);
- }
- mysql_mutex_unlock(&thd->mysys_var->mutex);
- }
-}
-
-static my_bool have_committing_connections()
-{
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- if (!is_client_connection(tmp))
- continue;
-
- if (is_committing_connection(tmp))
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- return TRUE;
- }
- }
- mysql_mutex_unlock(&LOCK_thread_count);
- return FALSE;
-}
-
-int wsrep_wait_committing_connections_close(int wait_time)
-{
- int sleep_time= 100;
-
- while (have_committing_connections() && wait_time > 0)
- {
- WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
- my_sleep(sleep_time);
- wait_time -= sleep_time;
- }
- if (have_committing_connections())
- {
- return 1;
- }
- return 0;
-}
-
-void wsrep_close_client_connections(my_bool wait_to_end)
-{
- /*
- First signal all threads that it's time to die
- */
-
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- bool kill_cached_threads_saved= kill_cached_threads;
- kill_cached_threads= true; // prevent future threads caching
- mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
- tmp->thread_id));
- /* We skip slave threads & scheduler on this first loop through. */
- if (!is_client_connection(tmp))
- continue;
-
- if (is_replaying_connection(tmp))
- {
- tmp->killed= KILL_CONNECTION;
- continue;
- }
-
- /* replicated transactions must be skipped */
- if (abort_replicated(tmp))
- continue;
-
- WSREP_DEBUG("closing connection %ld", tmp->thread_id);
- wsrep_close_thread(tmp);
- }
- mysql_mutex_unlock(&LOCK_thread_count);
-
- if (thread_count)
- sleep(2); // Give threads time to die
-
- mysql_mutex_lock(&LOCK_thread_count);
- /*
- Force remaining threads to die by closing the connection to the client
- */
-
- I_List_iterator<THD> it2(threads);
- while ((tmp=it2++))
- {
-#ifndef __bsdi__ // Bug in BSDI kernel
- if (is_client_connection(tmp) &&
- !abort_replicated(tmp) &&
- !is_replaying_connection(tmp))
- {
- WSREP_INFO("killing local connection: %ld",tmp->thread_id);
- close_connection(tmp,0,0);
- }
-#endif
- }
-
- DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
- if (wsrep_debug)
- WSREP_INFO("waiting for client connections to close: %u", thread_count);
-
- while (wait_to_end && have_client_connections())
- {
- mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
- DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
- }
-
- kill_cached_threads= kill_cached_threads_saved;
-
- mysql_mutex_unlock(&LOCK_thread_count);
-
- /* All client connection threads have now been aborted */
-}
-
-void wsrep_close_applier(THD *thd)
-{
- WSREP_DEBUG("closing applier %ld", thd->thread_id);
- wsrep_close_thread(thd);
-}
-
-static void wsrep_close_threads(THD *thd)
-{
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
- tmp->thread_id));
- /* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
- {
- WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id);
- wsrep_close_thread (tmp);
- }
- }
-
- mysql_mutex_unlock(&LOCK_thread_count);
-}
-
-void wsrep_close_applier_threads(int count)
-{
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++) && count)
- {
- DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
- tmp->thread_id));
- /* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier)
- {
- WSREP_DEBUG("closing wsrep applier thread %ld", tmp->thread_id);
- tmp->wsrep_applier_closing= TRUE;
- count--;
- }
- }
-
- mysql_mutex_unlock(&LOCK_thread_count);
-}
-
-void wsrep_wait_appliers_close(THD *thd)
-{
- /* Wait for wsrep appliers to gracefully exit */
- mysql_mutex_lock(&LOCK_thread_count);
- while (have_wsrep_appliers(thd) > 1)
- // 1 is for rollbacker thread which needs to be killed explicitly.
- // This gotta be fixed in a more elegant manner if we gonna have arbitrary
- // number of non-applier wsrep threads.
- {
- if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- my_sleep(100);
- mysql_mutex_lock(&LOCK_thread_count);
- }
- else
- mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
- DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
- }
- mysql_mutex_unlock(&LOCK_thread_count);
- /* Now kill remaining wsrep threads: rollbacker */
- wsrep_close_threads (thd);
- /* and wait for them to die */
- mysql_mutex_lock(&LOCK_thread_count);
- while (have_wsrep_appliers(thd) > 0)
- {
- if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- my_sleep(100);
- mysql_mutex_lock(&LOCK_thread_count);
- }
- else
- mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
- DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
- }
- mysql_mutex_unlock(&LOCK_thread_count);
-
- /* All wsrep applier threads have now been aborted. However, if this thread
- is also applier, we are still running...
- */
-}
-
-void wsrep_kill_mysql(THD *thd)
-{
- if (mysqld_server_started)
- {
- if (!shutdown_in_progress)
- {
- WSREP_INFO("starting shutdown");
- kill_mysql();
- }
- }
- else
- {
- unireg_abort(1);
- }
-}
-#endif /* WITH_WSREP */
/**
Remove the process' pid file.
diff --git a/sql/slave.cc b/sql/slave.cc
index 1d87e16249a..8665cf646fc 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3118,23 +3118,6 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli);
-#ifdef WITH_WSREP
- if (WSREP_ON && (ev->get_type_code() == XID_EVENT ||
- (ev->get_type_code() == QUERY_EVENT && thd->wsrep_mysql_replicated > 0 &&
- (!strncasecmp(((Query_log_event*)ev)->query , "BEGIN", 5) ||
- !strncasecmp(((Query_log_event*)ev)->query , "COMMIT", 6) ))))
- {
- if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
- {
- WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
- reason = Log_event::EVENT_SKIP_IGNORE;
- }
- else
- {
- thd->wsrep_mysql_replicated = 0;
- }
- }
-#endif
if (reason == Log_event::EVENT_SKIP_COUNT)
{
DBUG_ASSERT(rli->slave_skip_counter > 0);
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 1ededf0dd61..325c79043a3 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -4429,6 +4429,15 @@ restart:
#endif
err:
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit open_tables()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
+ thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
+
free_root(&new_frm_mem, MYF(0)); // Free pre-alloced block
if (error && *table_to_open)
@@ -4881,6 +4890,16 @@ end:
trans_rollback_stmt(thd);
close_thread_tables(thd);
}
+
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "End opening table");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
+ thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
+
DBUG_RETURN(table);
}
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index d2bc3328643..ef139dfefeb 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -3187,6 +3187,12 @@ bool Delayed_insert::handle_inserts(void)
mysql_cond_broadcast(&cond_client); // If waiting clients
}
}
+#ifdef WITH_WSREP
+ if (WSREP((&thd)))
+ thd_proc_info(&thd, "insert done");
+ else
+#endif /* WITH_WSREP */
+ thd_proc_info(&thd, 0);
mysql_mutex_unlock(&mutex);
/*
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 7239c99613c..675e82de145 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -970,9 +970,12 @@ bool do_command(THD *thd)
thd->wsrep_query_state= QUERY_EXEC;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
-#endif /* WITH_WSREP */
+ if ((WSREP(thd) && packet_length == packet_error) ||
+ (!WSREP(thd) && (packet_length= my_net_read(net)) == packet_error))
+#else
if (packet_length == packet_error)
+#endif /* WITH_WSREP */
{
DBUG_PRINT("info",("Got error %d reading command from socket %s",
net->error,
@@ -2708,9 +2711,6 @@ mysql_execute_command(THD *thd)
my_error(ER_NOT_SUPPORTED_YET, MYF(0), "embedded server");
break;
#endif
-#ifdef WITH_WSREP
- if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
-#endif /* WITH_WSREP */
case SQLCOM_SHOW_STATUS:
{
execute_show_status(thd, all_tables);
@@ -2745,6 +2745,10 @@ mysql_execute_command(THD *thd)
}
case SQLCOM_SHOW_STATUS_PROC:
case SQLCOM_SHOW_STATUS_FUNC:
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
+
case SQLCOM_SHOW_DATABASES:
case SQLCOM_SHOW_TABLES:
case SQLCOM_SHOW_TRIGGERS:
@@ -3237,12 +3241,6 @@ case SQLCOM_PREPARE:
if (create_info.tmp_table())
thd->variables.option_bits|= OPTION_KEEP_LOG;
/* regular create */
-#ifdef WITH_WSREP
- if (!thd->is_current_stmt_binlog_format_row() ||
- !(create_info.options & HA_LEX_CREATE_TMP_TABLE))
- WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name,
- NULL)
-#endif /* WITH_WSREP */
if (create_info.options & HA_LEX_CREATE_TABLE_LIKE)
{
/* CREATE TABLE ... LIKE ... */
@@ -3251,6 +3249,13 @@ case SQLCOM_PREPARE:
}
else
{
+#ifdef WITH_WSREP
+ if (!thd->is_current_stmt_binlog_format_row() ||
+ !(create_info.options & HA_LEX_CREATE_TMP_TABLE))
+ WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name,
+ NULL)
+#endif /* WITH_WSREP */
+
/* Regular CREATE TABLE */
res= mysql_create_table(thd, create_table,
&create_info, &alter_info);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 5a93f3b819a..412b941e946 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -2815,6 +2815,15 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
err:
unlock_slave_threads(mi);
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd_proc_info(thd, "exit stop_slave()");
+ else
+ thd_proc_info(thd, 0);
+#else /* WITH_WSREP */
+ thd_proc_info(thd, 0);
+#endif /* WITH_WSREP */
+
if (slave_errno)
{
if (net_report)
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 46f23838850..d63df895a13 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -59,6 +59,10 @@
#include "debug_sync.h"
#include "keycaches.h"
+#if !defined(MYSQL_MAX_VARIABLE_VALUE_LEN)
+#define MYSQL_MAX_VARIABLE_VALUE_LEN 1024
+#endif // !defined(MYSQL_MAX_VARIABLE_VALUE_LEN)
+
#ifdef WITH_PARTITION_STORAGE_ENGINE
#include "ha_partition.h"
#endif
@@ -8660,7 +8664,7 @@ ST_FIELD_INFO variables_fields_info[]=
{
{"VARIABLE_NAME", 64, MYSQL_TYPE_STRING, 0, 0, "Variable_name",
SKIP_OPEN_TABLE},
- {"VARIABLE_VALUE", 1024, MYSQL_TYPE_STRING, 0, 1,
+ {"VARIABLE_VALUE", MYSQL_MAX_VARIABLE_VALUE_LEN, MYSQL_TYPE_STRING, 0, 1,
"Value", SKIP_OPEN_TABLE},
{0, 0, MYSQL_TYPE_STRING, 0, 0, 0, SKIP_OPEN_TABLE}
};