diff options
Diffstat (limited to 'storage/tokudb/ha_tokudb_admin.cc')
-rw-r--r-- | storage/tokudb/ha_tokudb_admin.cc | 286 |
1 files changed, 167 insertions, 119 deletions
diff --git a/storage/tokudb/ha_tokudb_admin.cc b/storage/tokudb/ha_tokudb_admin.cc index db3d6c112d4..e1443101bb6 100644 --- a/storage/tokudb/ha_tokudb_admin.cc +++ b/storage/tokudb/ha_tokudb_admin.cc @@ -7,7 +7,7 @@ This file is part of TokuDB Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. - TokuDBis is free software: you can redistribute it and/or modify + TokuDB is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2, as published by the Free Software Foundation. @@ -43,13 +43,11 @@ public: virtual ~recount_rows_t(); virtual const char* key(); - - virtual void status( - char* database, - char* table, - char* type, - char* params, - char* status); + virtual const char* database(); + virtual const char* table(); + virtual const char* type(); + virtual const char* parameters(); + virtual const char* status(); protected: virtual void on_run(); @@ -64,6 +62,8 @@ private: ulonglong _throttle; // for recount rows status reporting + char _parameters[256]; + char _status[1024]; int _result; ulonglong _recount_start; // in microseconds ulonglong _total_elapsed_time; // in microseconds @@ -78,7 +78,6 @@ private: uint64_t deleted, void* extra); int analyze_recount_rows_progress(uint64_t count, uint64_t deleted); - void get_analyze_status(char*); }; void* recount_rows_t::operator new(size_t sz) { @@ -114,10 +113,19 @@ recount_rows_t::recount_rows_t( } _throttle = tokudb::sysvars::analyze_throttle(thd); + + snprintf(_parameters, + sizeof(_parameters), + "TOKUDB_ANALYZE_THROTTLE=%llu;", + _throttle); + _status[0] = '\0'; } recount_rows_t::~recount_rows_t() { } void recount_rows_t::on_run() { + const char* orig_proc_info = NULL; + if (_thd) + orig_proc_info = tokudb_thd_get_proc_info(_thd); _recount_start = tokudb::time::microsec(); _total_elapsed_time = 0; @@ -171,6 +179,8 @@ void recount_rows_t::on_run() { _result, _share->row_count()); error: + if(_thd) + tokudb_thd_set_proc_info(_thd, orig_proc_info); return; } void recount_rows_t::on_destroy() { @@ -179,18 +189,21 @@ void recount_rows_t::on_destroy() { const char* recount_rows_t::key() { return _share->full_table_name(); } -void recount_rows_t::status( - char* database, - char* table, - char* type, - char* params, - char* status) { - - strcpy(database, _share->database_name()); - strcpy(table, _share->table_name()); - strcpy(type, "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS"); - sprintf(params, "TOKUDB_ANALYZE_THROTTLE=%llu;", _throttle); - get_analyze_status(status); +const char* recount_rows_t::database() { + return _share->database_name(); +} +const char* recount_rows_t::table() { + return _share->table_name(); +} +const char* recount_rows_t::type() { + static const char* type = "TOKUDB_ANALYZE_MODE_RECOUNT_ROWS"; + return type; +} +const char* recount_rows_t::parameters() { + return _parameters; +} +const char* recount_rows_t::status() { + return _status; } int recount_rows_t::analyze_recount_rows_progress( uint64_t count, @@ -212,17 +225,37 @@ int recount_rows_t::analyze_recount_rows_progress( _ticks = 0; uint64_t now = tokudb::time::microsec(); _total_elapsed_time = now - _recount_start; - if ((_thd && thd_killed(_thd)) || cancelled()) { + if ((_thd && thd_kill_level(_thd)) || cancelled()) { // client killed return ER_ABORTING_CONNECTION; } + // rebuild status + // There is a slight race condition here, + // _status is used here for tokudb_thd_set_proc_info and it is also used + // for the status column in i_s.background_job_status. + // If someone happens to be querying/building the i_s table + // at the exact same time that the status is being rebuilt here, + // the i_s table could get some garbage status. + // This solution is a little heavy handed but it works, it prevents us + // from changing the status while someone might be immediately observing + // us and it prevents someone from observing us while we change the + // status + tokudb::background::_job_manager->lock(); + snprintf(_status, + sizeof(_status), + "recount_rows %s.%s counted %llu rows and %llu deleted " + "in %llu seconds.", + _share->database_name(), + _share->table_name(), + _rows, + _deleted_rows, + _total_elapsed_time / tokudb::time::MICROSECONDS); + tokudb::background::_job_manager->unlock(); + // report - if (_thd) { - char status[256]; - get_analyze_status(status); - thd_proc_info(_thd, status); - } + if (_thd) + tokudb_thd_set_proc_info(_thd, _status); // throttle // given the throttle value, lets calculate the maximum number of rows @@ -238,18 +271,6 @@ int recount_rows_t::analyze_recount_rows_progress( } return 0; } -void recount_rows_t::get_analyze_status(char* msg) { - sprintf( - msg, - "recount_rows %s.%s counted %llu rows and %llu deleted in %llu " - "seconds.", - _share->database_name(), - _share->table_name(), - _rows, - _deleted_rows, - _total_elapsed_time / tokudb::time::MICROSECONDS); -} - class standard_t : public tokudb::background::job_manager_t::job_t { public: @@ -261,13 +282,11 @@ public: virtual ~standard_t(); virtual const char* key(void); - - virtual void status( - char* database, - char* table, - char* type, - char* params, - char* status); + virtual const char* database(); + virtual const char* table(); + virtual const char* type(); + virtual const char* parameters(); + virtual const char* status(); protected: virtual void on_run(); @@ -284,6 +303,8 @@ private: double _delete_fraction; // for analyze status reporting, may also use other state + char _parameters[256]; + char _status[1024]; int _result; ulonglong _analyze_start; // in microseconds ulonglong _total_elapsed_time; // in microseconds @@ -305,7 +326,6 @@ private: uint64_t deleted_rows); bool analyze_standard_cursor_callback(uint64_t deleted_rows); - void get_analyze_status(char*); int analyze_key_progress(); int analyze_key(uint64_t* rec_per_key_part); }; @@ -351,6 +371,16 @@ standard_t::standard_t( _time_limit = tokudb::sysvars::analyze_time(thd) * tokudb::time::MICROSECONDS; _delete_fraction = tokudb::sysvars::analyze_delete_fraction(thd); + + snprintf(_parameters, + sizeof(_parameters), + "TOKUDB_ANALYZE_DELETE_FRACTION=%f; " + "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;", + _delete_fraction, + _time_limit / tokudb::time::MICROSECONDS, + _throttle); + + _status[0] = '\0'; } standard_t::~standard_t() { } @@ -358,6 +388,10 @@ void standard_t::on_run() { DB_BTREE_STAT64 stat64; uint64_t rec_per_key_part[_share->_max_key_parts]; uint64_t total_key_parts = 0; + const char* orig_proc_info = NULL; + if (_thd) + orig_proc_info = tokudb_thd_get_proc_info(_thd); + _analyze_start = tokudb::time::microsec(); _half_time = _time_limit > 0 ? _time_limit/2 : 0; @@ -395,7 +429,7 @@ void standard_t::on_run() { _result = HA_ADMIN_FAILED; } if (_thd && (_result == HA_ADMIN_FAILED || - (double)_deleted_rows > + static_cast<double>(_deleted_rows) > _delete_fraction * (_rows + _deleted_rows))) { char name[256]; int namelen; @@ -460,8 +494,9 @@ cleanup: } error: + if (_thd) + tokudb_thd_set_proc_info(_thd, orig_proc_info); return; - } void standard_t::on_destroy() { _share->lock(); @@ -472,24 +507,21 @@ void standard_t::on_destroy() { const char* standard_t::key() { return _share->full_table_name(); } -void standard_t::status( - char* database, - char* table, - char* type, - char* params, - char* status) { - - strcpy(database, _share->database_name()); - strcpy(table, _share->table_name()); - strcpy(type, "TOKUDB_ANALYZE_MODE_STANDARD"); - sprintf( - params, - "TOKUDB_ANALYZE_DELETE_FRACTION=%f; " - "TOKUDB_ANALYZE_TIME=%llu; TOKUDB_ANALYZE_THROTTLE=%llu;", - _delete_fraction, - _time_limit / tokudb::time::MICROSECONDS, - _throttle); - get_analyze_status(status); +const char* standard_t::database() { + return _share->database_name(); +} +const char* standard_t::table() { + return _share->table_name(); +} +const char* standard_t::type() { + static const char* type = "TOKUDB_ANALYZE_MODE_STANDARD"; + return type; +} +const char* standard_t::parameters() { + return _parameters; +} +const char* standard_t::status() { + return _status; } bool standard_t::analyze_standard_cursor_callback( void* extra, @@ -502,63 +534,81 @@ bool standard_t::analyze_standard_cursor_callback(uint64_t deleted_rows) { _ticks += deleted_rows; return analyze_key_progress() != 0; } -void standard_t::get_analyze_status(char* msg) { - static const char* scan_direction_str[] = { - "not scanning", - "scanning forward", - "scanning backward", - "scan unknown" - }; - - const char* scan_direction = NULL; - switch (_scan_direction) { - case 0: scan_direction = scan_direction_str[0]; break; - case DB_NEXT: scan_direction = scan_direction_str[1]; break; - case DB_PREV: scan_direction = scan_direction_str[2]; break; - default: scan_direction = scan_direction_str[3]; break; - } - - float progress_rows = 0.0; - if (_share->row_count() > 0) - progress_rows = (float) _rows / (float) _share->row_count(); - float progress_time = 0.0; - if (_time_limit > 0) - progress_time = (float) _key_elapsed_time / (float) _time_limit; - sprintf( - msg, - "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% time, " - "%s", - _share->database_name(), - _share->table_name(), - _share->_key_descriptors[_current_key]._name, - _current_key, - _share->_keys, - progress_rows * 100.0, - progress_time * 100.0, - scan_direction); -} int standard_t::analyze_key_progress(void) { if (_ticks > 1000) { _ticks = 0; uint64_t now = tokudb::time::microsec(); _total_elapsed_time = now - _analyze_start; _key_elapsed_time = now - _analyze_key_start; - if ((_thd && thd_killed(_thd)) || cancelled()) { + if ((_thd && thd_kill_level(_thd)) || cancelled()) { // client killed return ER_ABORTING_CONNECTION; - } else if(_time_limit > 0 && - (uint64_t)_key_elapsed_time > _time_limit) { + } else if (_time_limit > 0 && + static_cast<uint64_t>(_key_elapsed_time) > _time_limit) { // time limit reached return ETIME; } - // report - if (_thd) { - char status[256]; - get_analyze_status(status); - thd_proc_info(_thd, status); + // rebuild status + // There is a slight race condition here, + // _status is used here for tokudb_thd_set_proc_info and it is also used + // for the status column in i_s.background_job_status. + // If someone happens to be querying/building the i_s table + // at the exact same time that the status is being rebuilt here, + // the i_s table could get some garbage status. + // This solution is a little heavy handed but it works, it prevents us + // from changing the status while someone might be immediately observing + // us and it prevents someone from observing us while we change the + // status. + static const char* scan_direction_str[] = {"not scanning", + "scanning forward", + "scanning backward", + "scan unknown"}; + + const char* scan_direction = NULL; + switch (_scan_direction) { + case 0: + scan_direction = scan_direction_str[0]; + break; + case DB_NEXT: + scan_direction = scan_direction_str[1]; + break; + case DB_PREV: + scan_direction = scan_direction_str[2]; + break; + default: + scan_direction = scan_direction_str[3]; + break; } + float progress_rows = 0.0; + if (_share->row_count() > 0) + progress_rows = static_cast<float>(_rows) / + static_cast<float>(_share->row_count()); + float progress_time = 0.0; + if (_time_limit > 0) + progress_time = static_cast<float>(_key_elapsed_time) / + static_cast<float>(_time_limit); + tokudb::background::_job_manager->lock(); + snprintf( + _status, + sizeof(_status), + "analyze table standard %s.%s.%s %llu of %u %.lf%% rows %.lf%% " + "time, %s", + _share->database_name(), + _share->table_name(), + _share->_key_descriptors[_current_key]._name, + _current_key, + _share->_keys, + progress_rows * 100.0, + progress_time * 100.0, + scan_direction); + tokudb::background::_job_manager->unlock(); + + // report + if (_thd) + tokudb_thd_set_proc_info(_thd, _status); + // throttle // given the throttle value, lets calculate the maximum number of rows // we should have seen so far in a .1 sec resolution @@ -694,6 +744,11 @@ int standard_t::analyze_key(uint64_t* rec_per_key_part) { assert_always(close_error == 0); done: + // in case we timed out (bunch of deleted records) without hitting a + // single row + if (_rows == 0) + _rows = 1; + // return cardinality for (uint64_t i = 0; i < num_key_parts; i++) { rec_per_key_part[i] = _rows / unique_rows[i]; @@ -733,7 +788,6 @@ int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) { assert_always(thd != NULL); - const char *orig_proc_info = tokudb_thd_get_proc_info(thd); int result = HA_ADMIN_OK; tokudb::analyze::recount_rows_t* job @@ -753,8 +807,6 @@ int TOKUDB_SHARE::analyze_recount_rows(THD* thd,DB_TXN* txn) { result = HA_ADMIN_FAILED; } - thd_proc_info(thd, orig_proc_info); - TOKUDB_HANDLER_DBUG_RETURN(result); } @@ -778,8 +830,6 @@ int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) { TOKUDB_HANDLER_DBUG_RETURN(result); } - const char *orig_proc_info = tokudb_thd_get_proc_info(thd); - tokudb::analyze::standard_t* job = new tokudb::analyze::standard_t(txn == NULL ? false : true, thd, this, txn); @@ -808,8 +858,6 @@ int TOKUDB_SHARE::analyze_standard(THD* thd, DB_TXN* txn) { lock(); - thd_proc_info(thd, orig_proc_info); - TOKUDB_HANDLER_DBUG_RETURN(result); } @@ -828,7 +876,7 @@ typedef struct hot_optimize_context { static int hot_optimize_progress_fun(void *extra, float progress) { HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra; - if (thd_killed(context->thd)) { + if (thd_kill_level(context->thd)) { sprintf( context->write_status_msg, "The process has been killed, aborting hot optimize."); @@ -955,7 +1003,7 @@ struct check_context { static int ha_tokudb_check_progress(void* extra, float progress) { struct check_context* context = (struct check_context*)extra; int result = 0; - if (thd_killed(context->thd)) + if (thd_kill_level(context->thd)) result = ER_ABORTING_CONNECTION; return result; } |