diff options
author | Kentoku SHIBA <kentokushiba@gmail.com> | 2017-11-16 11:11:52 +0200 |
---|---|---|
committer | Monty <monty@mariadb.org> | 2017-12-03 13:58:36 +0200 |
commit | e53ef202bd7706b88760472472af5ae878065f4f (patch) | |
tree | b4835ec250e0e00d06097da05a28206ca75df75d /sql | |
parent | d1e4ecec07b0611d3d0cbbf53342246ca2c4f600 (diff) | |
download | mariadb-git-e53ef202bd7706b88760472472af5ae878065f4f.tar.gz |
Adding direct update/delete to the server and to the partition engine.
Add support for direct update and direct delete requests for spider.
A direct update/delete request handles all qualified rows in a single
operation rather than one row at a time.
Contains Spiral patches:
006_mariadb-10.2.0.direct_update_rows.diff MDEV-7704
008_mariadb-10.2.0.partition_direct_update.diff MDEV-7706
010_mariadb-10.2.0.direct_update_rows2.diff MDEV-7708
011_mariadb-10.2.0.aggregate.diff MDEV-7709
027_mariadb-10.2.0.force_bulk_update.diff MDEV-7724
061_mariadb-10.2.0.mariadb-10.1.8.diff MDEV-12870
- The differences compared to the original patches:
- Most of the parameters of the new functions are unnecessary. The
unnecessary parameters have been removed.
- Changed bit positions for new handler flags upon consideration of
handler flags not needed by other Spiral patches and handler flags
merged from MySQL.
- Added info_push() (Was originally part of bulk access patch)
- Didn't include code related to handler socket
- Added HA_CAN_DIRECT_UPDATE_AND_DELETE
Original author: Kentoku SHIBA
First reviewer: Jacob Mathew
Second reviewer: Michael Widenius
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_partition.cc | 585 | ||||
-rw-r--r-- | sql/ha_partition.h | 17 | ||||
-rw-r--r-- | sql/handler.cc | 55 | ||||
-rw-r--r-- | sql/handler.h | 121 | ||||
-rw-r--r-- | sql/opt_sum.cc | 3 | ||||
-rw-r--r-- | sql/sql_delete.cc | 75 | ||||
-rw-r--r-- | sql/sql_update.cc | 96 | ||||
-rw-r--r-- | sql/table.cc | 53 | ||||
-rw-r--r-- | sql/table.h | 2 |
9 files changed, 976 insertions, 31 deletions
diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc index 972e8820246..0d5793c180b 100644 --- a/sql/ha_partition.cc +++ b/sql/ha_partition.cc @@ -6794,6 +6794,7 @@ FT_INFO *ha_partition::ft_init_ext(uint flags, uint inx, String *key) DBUG_RETURN((FT_INFO*)ft_target); } + /** Return the next record from the FT result set during an ordered index pre-scan @@ -10765,6 +10766,590 @@ void ha_partition::cond_pop() DBUG_VOID_RETURN; } + +/** + Perform bulk update preparation on each partition. + + SYNOPSIS + start_bulk_update() + + RETURN VALUE + TRUE Error + FALSE Success +*/ + +bool ha_partition::start_bulk_update() +{ + handler **file= m_file; + DBUG_ENTER("ha_partition::start_bulk_update"); + + if (bitmap_is_overlapping(&m_part_info->full_part_field_set, + table->write_set)) + DBUG_RETURN(TRUE); + + do + { + if ((*file)->start_bulk_update()) + DBUG_RETURN(TRUE); + } while (*(++file)); + DBUG_RETURN(FALSE); +} + + +/** + Perform bulk update execution on each partition. A bulk update allows + a handler to batch the updated rows instead of performing the updates + one row at a time. + + SYNOPSIS + exec_bulk_update() + + RETURN VALUE + TRUE Error + FALSE Success +*/ + +int ha_partition::exec_bulk_update(ha_rows *dup_key_found) +{ + int error; + handler **file= m_file; + DBUG_ENTER("ha_partition::exec_bulk_update"); + + do + { + if ((error= (*file)->exec_bulk_update(dup_key_found))) + DBUG_RETURN(error); + } while (*(++file)); + DBUG_RETURN(0); +} + + +/** + Perform bulk update cleanup on each partition. + + SYNOPSIS + end_bulk_update() + + RETURN VALUE + NONE +*/ + +int ha_partition::end_bulk_update() +{ + int error= 0; + handler **file= m_file; + DBUG_ENTER("ha_partition::end_bulk_update"); + + do + { + int tmp; + if ((tmp= (*file)->end_bulk_update())) + error= tmp; + } while (*(++file)); + DBUG_RETURN(error); +} + + +/** + Add the row to the bulk update on the partition on which the row is stored. + A bulk update allows a handler to batch the updated rows instead of + performing the updates one row at a time. + + SYNOPSIS + bulk_update_row() + old_data Old record + new_data New record + dup_key_found Number of duplicate keys found + + RETURN VALUE + >1 Error + 1 Bulk update not used, normal operation used + 0 Bulk update used by handler +*/ + +int ha_partition::bulk_update_row(const uchar *old_data, const uchar *new_data, + ha_rows *dup_key_found) +{ + int error= 0; + uint32 part_id; + longlong func_value; + my_bitmap_map *old_map; + DBUG_ENTER("ha_partition::bulk_update_row"); + + old_map= dbug_tmp_use_all_columns(table, table->read_set); + error= m_part_info->get_partition_id(m_part_info, &part_id, + &func_value); + dbug_tmp_restore_column_map(table->read_set, old_map); + if (unlikely(error)) + { + m_part_info->err_value= func_value; + goto end; + } + + error= m_file[part_id]->ha_bulk_update_row(old_data, new_data, + dup_key_found); + +end: + DBUG_RETURN(error); +} + + +/** + Perform bulk delete preparation on each partition. + + SYNOPSIS + start_bulk_delete() + + RETURN VALUE + TRUE Error + FALSE Success +*/ + +bool ha_partition::start_bulk_delete() +{ + handler **file= m_file; + DBUG_ENTER("ha_partition::start_bulk_delete"); + + do + { + if ((*file)->start_bulk_delete()) + DBUG_RETURN(TRUE); + } while (*(++file)); + DBUG_RETURN(FALSE); +} + + +/** + Perform bulk delete cleanup on each partition. + + SYNOPSIS + end_bulk_delete() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::end_bulk_delete() +{ + int error= 0; + handler **file= m_file; + DBUG_ENTER("ha_partition::end_bulk_delete"); + + do + { + int tmp; + if ((tmp= (*file)->end_bulk_delete())) + error= tmp; + } while (*(++file)); + DBUG_RETURN(error); +} + + +/** + Perform initialization for a direct update request. + + SYNOPSIS + direct_update_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_update_rows_init() +{ + int error; + uint i, found; + handler *file; + DBUG_ENTER("ha_partition::direct_update_rows_init"); + + if (bitmap_is_overlapping(&m_part_info->full_part_field_set, + table->write_set)) + { + DBUG_PRINT("info", ("partition FALSE by updating part_key")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + + m_part_spec.start_part= 0; + m_part_spec.end_part= m_tot_parts - 1; + m_direct_update_part_spec= m_part_spec; + + found= 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + file= m_file[i]; + if ((error= (m_pre_calling ? + file->pre_direct_update_rows_init() : + file->direct_update_rows_init()))) + { + DBUG_PRINT("info", ("partition FALSE by storage engine")); + DBUG_RETURN(error); + } + found++; + } + } + + TABLE_LIST *table_list= table->pos_in_table_list; + if (found != 1 && table_list) + { + while (table_list->parent_l) + table_list= table_list->parent_l; + st_select_lex *select_lex= table_list->select_lex; + DBUG_PRINT("info", ("partition select_lex: %p", select_lex)); + if (select_lex && select_lex->explicit_limit) + { + DBUG_PRINT("info", ("partition explicit_limit=TRUE")); + DBUG_PRINT("info", ("partition offset_limit: %p", + select_lex->offset_limit)); + DBUG_PRINT("info", ("partition select_limit: %p", + select_lex->select_limit)); + DBUG_PRINT("info", ("partition FALSE by select_lex")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + } + DBUG_PRINT("info", ("partition OK")); + DBUG_RETURN(0); +} + + +/** + Do initialization for performing parallel direct update + for a handlersocket update request. + + SYNOPSIS + pre_direct_update_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_update_rows_init() +{ + bool save_m_pre_calling; + int error; + DBUG_ENTER("ha_partition::pre_direct_update_rows_init"); + save_m_pre_calling= m_pre_calling; + m_pre_calling= TRUE; + error= direct_update_rows_init(); + m_pre_calling= save_m_pre_calling; + DBUG_RETURN(error); +} + + +/** + Execute a direct update request. A direct update request updates all + qualified rows in a single operation, rather than one row at a time. + The direct update operation is pushed down to each individual + partition. + + SYNOPSIS + direct_update_rows() + update_rows Number of updated rows + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_update_rows(ha_rows *update_rows_result) +{ + int error; + bool rnd_seq= FALSE; + ha_rows update_rows= 0; + uint32 i; + DBUG_ENTER("ha_partition::direct_update_rows"); + + /* If first call to direct_update_rows with RND scan */ + if ((m_pre_calling ? pre_inited : inited) == RND && m_scan_value == 1) + { + rnd_seq= TRUE; + m_scan_value= 2; + } + + *update_rows_result= 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + handler *file= m_file[i]; + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + if (rnd_seq && (m_pre_calling ? file->pre_inited : file->inited) == NONE) + { + if ((error= (m_pre_calling ? + file->ha_pre_rnd_init(TRUE) : + file->ha_rnd_init(TRUE)))) + DBUG_RETURN(error); + } + if ((error= (m_pre_calling ? + (file)->pre_direct_update_rows() : + (file)->ha_direct_update_rows(&update_rows)))) + { + if (rnd_seq) + { + if (m_pre_calling) + file->ha_pre_rnd_end(); + else + file->ha_rnd_end(); + } + DBUG_RETURN(error); + } + *update_rows_result+= update_rows; + } + if (rnd_seq) + { + if ((error= (m_pre_calling ? + file->ha_pre_index_or_rnd_end() : + file->ha_index_or_rnd_end()))) + DBUG_RETURN(error); + } + } + DBUG_RETURN(0); +} + + +/** + Start parallel execution of a direct update for a handlersocket update + request. A direct update request updates all qualified rows in a single + operation, rather than one row at a time. The direct update operation + is pushed down to each individual partition. + + SYNOPSIS + pre_direct_update_rows() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_update_rows() +{ + bool save_m_pre_calling; + int error; + ha_rows not_used= 0; + DBUG_ENTER("ha_partition::pre_direct_update_rows"); + save_m_pre_calling= m_pre_calling; + m_pre_calling= TRUE; + error= direct_update_rows(¬_used); + m_pre_calling= save_m_pre_calling; + DBUG_RETURN(error); +} + + +/** + Perform initialization for a direct delete request. + + SYNOPSIS + direct_delete_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_delete_rows_init() +{ + int error; + uint i, found; + DBUG_ENTER("ha_partition::direct_delete_rows_init"); + + m_part_spec.start_part= 0; + m_part_spec.end_part= m_tot_parts - 1; + m_direct_update_part_spec= m_part_spec; + + found= 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + handler *file= m_file[i]; + if ((error= (m_pre_calling ? + file->pre_direct_delete_rows_init() : + file->direct_delete_rows_init()))) + { + DBUG_PRINT("exit", ("error in direct_delete_rows_init")); + DBUG_RETURN(error); + } + found++; + } + } + + TABLE_LIST *table_list= table->pos_in_table_list; + if (found != 1 && table_list) + { + while (table_list->parent_l) + table_list= table_list->parent_l; + st_select_lex *select_lex= table_list->select_lex; + DBUG_PRINT("info", ("partition select_lex: %p", select_lex)); + if (select_lex && select_lex->explicit_limit) + { + DBUG_PRINT("info", ("partition explicit_limit: TRUE")); + DBUG_PRINT("info", ("partition offset_limit: %p", + select_lex->offset_limit)); + DBUG_PRINT("info", ("partition select_limit: %p", + select_lex->select_limit)); + DBUG_PRINT("info", ("partition FALSE by select_lex")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + } + DBUG_PRINT("exit", ("OK")); + DBUG_RETURN(0); +} + + +/** + Do initialization for performing parallel direct delete + for a handlersocket delete request. + + SYNOPSIS + pre_direct_delete_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_delete_rows_init() +{ + bool save_m_pre_calling; + int error; + DBUG_ENTER("ha_partition::pre_direct_delete_rows_init"); + save_m_pre_calling= m_pre_calling; + m_pre_calling= TRUE; + error= direct_delete_rows_init(); + m_pre_calling= save_m_pre_calling; + DBUG_RETURN(error); +} + + +/** + Execute a direct delete request. A direct delete request deletes all + qualified rows in a single operation, rather than one row at a time. + The direct delete operation is pushed down to each individual + partition. + + SYNOPSIS + direct_delete_rows() + delete_rows Number of deleted rows + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_delete_rows(ha_rows *delete_rows_result) +{ + int error; + bool rnd_seq= FALSE; + ha_rows delete_rows= 0; + uint32 i; + handler *file; + DBUG_ENTER("ha_partition::direct_delete_rows"); + + if ((m_pre_calling ? pre_inited : inited) == RND && m_scan_value == 1) + { + rnd_seq= TRUE; + m_scan_value= 2; + } + + *delete_rows_result= 0; + m_part_spec= m_direct_update_part_spec; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + file= m_file[i]; + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + if (rnd_seq && (m_pre_calling ? file->pre_inited : file->inited) == NONE) + { + if ((error= (m_pre_calling ? + file->ha_pre_rnd_init(TRUE) : + file->ha_rnd_init(TRUE)))) + DBUG_RETURN(error); + } + if ((error= (m_pre_calling ? + file->pre_direct_delete_rows() : + file->ha_direct_delete_rows(&delete_rows)))) + { + if (m_pre_calling) + file->ha_pre_rnd_end(); + else + file->ha_rnd_end(); + DBUG_RETURN(error); + } + delete_rows_result+= delete_rows; + } + if (rnd_seq) + { + if ((error= (m_pre_calling ? + file->ha_pre_index_or_rnd_end() : + file->ha_index_or_rnd_end()))) + DBUG_RETURN(error); + } + } + DBUG_RETURN(0); +} + + +/** + Start parallel execution of a direct delete for a handlersocket delete + request. A direct delete request deletes all qualified rows in a single + operation, rather than one row at a time. The direct delete operation + is pushed down to each individual partition. + + SYNOPSIS + pre_direct_delete_rows() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_delete_rows() +{ + bool save_m_pre_calling; + int error; + ha_rows not_used; + DBUG_ENTER("ha_partition::pre_direct_delete_rows"); + save_m_pre_calling= m_pre_calling; + m_pre_calling= TRUE; + error= direct_delete_rows(¬_used); + m_pre_calling= save_m_pre_calling; + DBUG_RETURN(error); +} + +/** + Push metadata for the current operation down to each partition. + + SYNOPSIS + info_push() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::info_push(uint info_type, void *info) +{ + int error= 0; + handler **file= m_file; + DBUG_ENTER("ha_partition::info_push"); + + do + { + int tmp; + if ((tmp= (*file)->info_push(info_type, info))) + error= tmp; + } while (*(++file)); + DBUG_RETURN(error); +} + + void ha_partition::clear_top_table_fields() { handler **file; diff --git a/sql/ha_partition.h b/sql/ha_partition.h index c636340a5dc..f79a272abad 100644 --- a/sql/ha_partition.h +++ b/sql/ha_partition.h @@ -296,6 +296,7 @@ private: ha_rows m_bulk_inserted_rows; /** used for prediction of start_bulk_insert rows */ enum_monotonicity_info m_part_func_monotonicity_info; + part_id_range m_direct_update_part_spec; bool m_pre_calling; bool m_pre_call_use_parallel; /* Keep track of bulk access requests */ @@ -535,8 +536,23 @@ public: number of calls to write_row. */ virtual int write_row(uchar * buf); + virtual bool start_bulk_update(); + virtual int exec_bulk_update(ha_rows *dup_key_found); + virtual int end_bulk_update(); + virtual int bulk_update_row(const uchar *old_data, const uchar *new_data, + ha_rows *dup_key_found); virtual int update_row(const uchar * old_data, const uchar * new_data); + virtual int direct_update_rows_init(); + virtual int pre_direct_update_rows_init(); + virtual int direct_update_rows(ha_rows *update_rows); + virtual int pre_direct_update_rows(); + virtual bool start_bulk_delete(); + virtual int end_bulk_delete(); virtual int delete_row(const uchar * buf); + virtual int direct_delete_rows_init(); + virtual int pre_direct_delete_rows_init(); + virtual int direct_delete_rows(ha_rows *delete_rows); + virtual int pre_direct_delete_rows(); virtual int delete_all_rows(void); virtual int truncate(); virtual void start_bulk_insert(ha_rows rows, uint flags); @@ -1306,6 +1322,7 @@ public: virtual const COND *cond_push(const COND *cond); virtual void cond_pop(); virtual void clear_top_table_fields(); + virtual int info_push(uint info_type, void *info); private: int handle_opt_partitions(THD *thd, HA_CHECK_OPT *check_opt, uint flags); diff --git a/sql/handler.cc b/sql/handler.cc index a215c13e718..4e059e0e56c 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -4058,7 +4058,7 @@ int handler::ha_repair(THD* thd, HA_CHECK_OPT* check_opt) int handler::ha_bulk_update_row(const uchar *old_data, const uchar *new_data, - uint *dup_key_found) + ha_rows *dup_key_found) { DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); @@ -6163,6 +6163,59 @@ int handler::ha_delete_row(const uchar *buf) } +/** + Execute a direct update request. A direct update request updates all + qualified rows in a single operation, rather than one row at a time. + In a Spider cluster the direct update operation is pushed down to the + child levels of the cluster. + + Note that this can't be used in case of statment logging + + @param update_rows Number of updated rows. + + @retval 0 Success. + @retval != 0 Failure. +*/ + +int handler::ha_direct_update_rows(ha_rows *update_rows) +{ + int error; + + MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); + mark_trx_read_write(); + + error = direct_update_rows(update_rows); + MYSQL_UPDATE_ROW_DONE(error); + return error; +} + + +/** + Execute a direct delete request. A direct delete request deletes all + qualified rows in a single operation, rather than one row at a time. + In a Spider cluster the direct delete operation is pushed down to the + child levels of the cluster. + + @param delete_rows Number of deleted rows. + + @retval 0 Success. + @retval != 0 Failure. +*/ + +int handler::ha_direct_delete_rows(ha_rows *delete_rows) +{ + int error; + /* Ensure we are not using binlog row */ + DBUG_ASSERT(!table->in_use->is_current_stmt_binlog_format_row()); + + MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str); + mark_trx_read_write(); + + error = direct_delete_rows(delete_rows); + MYSQL_DELETE_ROW_DONE(error); + return error; +} + /** @brief use_hidden_primary_key() is called in case of an update/delete when diff --git a/sql/handler.h b/sql/handler.h index cbc30872abf..a176b45d8db 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -287,6 +287,11 @@ enum enum_alter_inplace_result { */ #define HA_BINLOG_FLAGS (HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE) +/* The following are used by Spider */ +#define HA_CAN_FORCE_BULK_UPDATE (1ULL << 50) +#define HA_CAN_FORCE_BULK_DELETE (1ULL << 51) +#define HA_CAN_DIRECT_UPDATE_AND_DELETE (1ULL << 52) + /* The following is for partition handler */ #define HA_CAN_MULTISTEP_MERGE (1LL << 53) @@ -442,6 +447,12 @@ static const uint MYSQL_START_TRANS_OPT_READ_WRITE = 4; #define HA_CHECK_DUP (HA_CHECK_DUP_KEY + HA_CHECK_DUP_UNIQUE) #define HA_CHECK_ALL (~0U) +/* Options for info_push() */ +#define INFO_KIND_UPDATE_FIELDS 101 +#define INFO_KIND_UPDATE_VALUES 102 +#define INFO_KIND_FORCE_LIMIT_BEGIN 103 +#define INFO_KIND_FORCE_LIMIT_END 104 + enum legacy_db_type { /* note these numerical values are fixed and can *not* be changed */ @@ -1420,9 +1431,6 @@ handlerton *ha_default_tmp_handlerton(THD *thd); #define HTON_TEMPORARY_NOT_SUPPORTED (1 << 6) //Having temporary tables not supported #define HTON_SUPPORT_LOG_TABLES (1 << 7) //Engine supports log tables #define HTON_NO_PARTITION (1 << 8) //Not partition of these tables -#define HTON_CAN_MULTISTEP_MERGE (1 << 9) //You can merge mearged tables -// Engine needs to access the main connect string in partitions -#define HTON_CAN_READ_CONNECT_STRING_IN_PARTITION (1 << 10) /* This flag should be set when deciding that the engine does not allow @@ -1443,6 +1451,10 @@ handlerton *ha_default_tmp_handlerton(THD *thd); // MySQL compatibility. Unused. #define HTON_SUPPORTS_FOREIGN_KEYS (1 << 0) //Foreign key constraint supported. +#define HTON_CAN_MERGE (1 <<11) //Merge type table +// Engine needs to access the main connect string in partitions +#define HTON_CAN_READ_CONNECT_STRING_IN_PARTITION (1 <<12) + class Ha_trx_info; struct THD_TRANS @@ -2718,7 +2730,8 @@ public: /** Length of ref (1-8 or the clustered key length) */ uint ref_length; FT_INFO *ft_handler; - enum {NONE=0, INDEX, RND} inited; + enum init_stat { NONE=0, INDEX, RND }; + init_stat inited, pre_inited; const COND *pushed_cond; /** @@ -2817,7 +2830,7 @@ public: key_used_on_scan(MAX_KEY), active_index(MAX_KEY), keyread(MAX_KEY), ref_length(sizeof(my_off_t)), - ft_handler(0), inited(NONE), + ft_handler(0), inited(NONE), pre_inited(NONE), pushed_cond(0), next_insert_id(0), insert_id_for_cur_row(0), tracker(NULL), pushed_idx_cond(NULL), @@ -2958,7 +2971,7 @@ public: DBUG_RETURN(ret); } int ha_bulk_update_row(const uchar *old_data, const uchar *new_data, - uint *dup_key_found); + ha_rows *dup_key_found); int ha_delete_all_rows(); int ha_truncate(); int ha_reset_auto_increment(ulonglong value); @@ -3156,7 +3169,7 @@ public: @retval 0 Success @retval >0 Error code */ - virtual int exec_bulk_update(uint *dup_key_found) + virtual int exec_bulk_update(ha_rows *dup_key_found) { DBUG_ASSERT(FALSE); return HA_ERR_WRONG_COMMAND; @@ -3165,7 +3178,7 @@ public: Perform any needed clean-up, no outstanding updates are there at the moment. */ - virtual void end_bulk_update() { return; } + virtual int end_bulk_update() { return 0; } /** Execute all outstanding deletes and close down the bulk delete. @@ -3208,6 +3221,48 @@ public: { return 0; } virtual int pre_rnd_next(bool use_parallel) { return 0; } + int ha_pre_rnd_init(bool scan) + { + int result; + DBUG_ENTER("ha_pre_rnd_init"); + DBUG_ASSERT(pre_inited==NONE || (pre_inited==RND && scan)); + pre_inited= (result= pre_rnd_init(scan)) ? NONE: RND; + DBUG_RETURN(result); + } + int ha_pre_rnd_end() + { + DBUG_ENTER("ha_pre_rnd_end"); + DBUG_ASSERT(pre_inited==RND); + pre_inited=NONE; + DBUG_RETURN(pre_rnd_end()); + } + virtual int pre_rnd_init(bool scan) { return 0; } + virtual int pre_rnd_end() { return 0; } + virtual int pre_index_init(uint idx, bool sorted) { return 0; } + virtual int pre_index_end() { return 0; } + int ha_pre_index_init(uint idx, bool sorted) + { + int result; + DBUG_ENTER("ha_pre_index_init"); + DBUG_ASSERT(pre_inited==NONE); + if (!(result= pre_index_init(idx, sorted))) + pre_inited=INDEX; + DBUG_RETURN(result); + } + int ha_pre_index_end() + { + DBUG_ENTER("ha_pre_index_end"); + DBUG_ASSERT(pre_inited==INDEX); + pre_inited=NONE; + DBUG_RETURN(pre_index_end()); + } + int ha_pre_index_or_rnd_end() + { + return (pre_inited == INDEX ? + ha_pre_index_end() : + pre_inited == RND ? ha_pre_rnd_end() : 0 ); + } + /** @brief Positions an index cursor to the index specified in the @@ -3713,6 +3768,11 @@ public: virtual void cond_pop() { return; }; /** + Push metadata for the current operation down to the table handler. + */ + virtual int info_push(uint info_type, void *info) { return 0; }; + + /** This function is used to get correlating of a parent (table/column) and children (table/column). When conditions are pushed down to child table (like child of myisam_merge), child table needs to know about @@ -4162,6 +4222,49 @@ private: { return HA_ERR_WRONG_COMMAND; } + + /* Perform initialization for a direct update request */ +public: + int ha_direct_update_rows(ha_rows *update_rows); + virtual int direct_update_rows_init() + { + return HA_ERR_WRONG_COMMAND; + } +private: + virtual int pre_direct_update_rows_init() + { + return HA_ERR_WRONG_COMMAND; + } + virtual int direct_update_rows(ha_rows *update_rows __attribute__((unused))) + { + return HA_ERR_WRONG_COMMAND; + } + virtual int pre_direct_update_rows() + { + return HA_ERR_WRONG_COMMAND; + } + + /* Perform initialization for a direct delete request */ +public: + int ha_direct_delete_rows(ha_rows *delete_rows); + virtual int direct_delete_rows_init() + { + return HA_ERR_WRONG_COMMAND; + } +private: + virtual int pre_direct_delete_rows_init() + { + return HA_ERR_WRONG_COMMAND; + } + virtual int direct_delete_rows(ha_rows *delete_rows __attribute__((unused))) + { + return HA_ERR_WRONG_COMMAND; + } + virtual int pre_direct_delete_rows() + { + return HA_ERR_WRONG_COMMAND; + } + /** Reset state of file to after 'open'. This function is called after every statement for all tables used @@ -4236,7 +4339,7 @@ public: @retval 1 Bulk delete not used, normal operation used */ virtual int bulk_update_row(const uchar *old_data, const uchar *new_data, - uint *dup_key_found) + ha_rows *dup_key_found) { DBUG_ASSERT(FALSE); return HA_ERR_WRONG_COMMAND; diff --git a/sql/opt_sum.cc b/sql/opt_sum.cc index 72b28473e15..43d1c2de7ad 100644 --- a/sql/opt_sum.cc +++ b/sql/opt_sum.cc @@ -398,6 +398,8 @@ int opt_sum_query(THD *thd, const_result= 0; break; } + longlong info_limit= 1; + table->file->info_push(INFO_KIND_FORCE_LIMIT_BEGIN, &info_limit); if (!(error= table->file->ha_index_init((uint) ref.key, 1))) error= (is_max ? get_index_max_value(table, &ref, range_fl) : @@ -410,6 +412,7 @@ int opt_sum_query(THD *thd, error= HA_ERR_KEY_NOT_FOUND; table->file->ha_end_keyread(); table->file->ha_index_end(); + table->file->info_push(INFO_KIND_FORCE_LIMIT_END, NULL); if (error) { if (error == HA_ERR_KEY_NOT_FOUND || error == HA_ERR_END_OF_FILE) diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index 0d4bed836b8..984a0799b54 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -250,7 +250,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, SQL_I_List<ORDER> *order_list, ha_rows limit, ulonglong options, select_result *result) { - bool will_batch; + bool will_batch= FALSE; int error, loc_error; TABLE *table; SQL_SELECT *select=0; @@ -262,11 +262,13 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, bool return_error= 0; ha_rows deleted= 0; bool reverse= FALSE; + bool has_triggers; ORDER *order= (ORDER *) ((order_list && order_list->elements) ? order_list->first : NULL); SELECT_LEX *select_lex= &thd->lex->select_lex; killed_state killed_status= NOT_KILLED; THD::enum_binlog_query_type query_type= THD::ROW_QUERY_TYPE; + bool binlog_is_row; bool with_select= !select_lex->item_list.is_empty(); Explain_delete *explain; Delete_plan query_plan(thd->mem_root); @@ -371,9 +373,12 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, - We should not be binlogging this statement in row-based, and - there should be no delete triggers associated with the table. */ + + has_triggers= (table->triggers && + table->triggers->has_delete_triggers()); if (!with_select && !using_limit && const_cond_result && (!thd->is_current_stmt_binlog_format_row() && - !(table->triggers && table->triggers->has_delete_triggers()))) + !has_triggers)) { /* Update the table->file->stats.records number */ table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK); @@ -522,14 +527,59 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, if (!(select && select->quick)) status_var_increment(thd->status_var.delete_scan_count); - if (query_plan.using_filesort) + binlog_is_row= thd->is_current_stmt_binlog_format_row(); + DBUG_PRINT("info", ("binlog_is_row: %s", binlog_is_row ? "TRUE" : "FALSE")); + + /* + We can use direct delete (delete that is done silently in the handler) + if none of the following conditions are true: + - There are triggers + - There is binary logging + - There is a virtual not stored column in the WHERE clause + - ORDER BY or LIMIT + - As this requires the rows to be deleted in a specific order + - Note that Spider can handle ORDER BY and LIMIT in a cluster with + one data node. These conditions are therefore checked in + direct_delete_rows_init(). + + Direct delete does not require a WHERE clause + + Later we also ensure that we are only using one table (no sub queries) + */ + + if ((table->file->ha_table_flags() & HA_CAN_DIRECT_UPDATE_AND_DELETE) && + !has_triggers && !binlog_is_row && !with_select) { + table->mark_columns_needed_for_delete(); + if (!table->check_virtual_columns_marked_for_read()) + { + DBUG_PRINT("info", ("Trying direct delete")); + if (select && select->cond && + (select->cond->used_tables() == table->map)) + { + DBUG_ASSERT(!table->file->pushed_cond); + if (!table->file->cond_push(select->cond)) + table->file->pushed_cond= select->cond; + } + if (!table->file->direct_delete_rows_init()) + { + /* Direct deleting is supported */ + DBUG_PRINT("info", ("Using direct delete")); + THD_STAGE_INFO(thd, stage_updating); + if (!(error= table->file->ha_direct_delete_rows(&deleted))) + error= -1; + goto terminate_delete; + } + } + } + if (query_plan.using_filesort) + { { Filesort fsort(order, HA_POS_ERROR, true, select); DBUG_ASSERT(query_plan.index == MAX_KEY); - Filesort_tracker *fs_tracker= + Filesort_tracker *fs_tracker= thd->lex->explain->get_upd_del_plan()->filesort_tracker; if (!(file_sort= filesort(thd, table, &fsort, fs_tracker))) @@ -568,15 +618,12 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, if (init_ftfuncs(thd, select_lex, 1)) goto got_error; - if (table->prepare_triggers_for_delete_stmt_or_event()) - { - will_batch= FALSE; - } - else - will_batch= !table->file->start_bulk_delete(); - table->mark_columns_needed_for_delete(); + if ((table->file->ha_table_flags() & HA_CAN_FORCE_BULK_DELETE) && + !table->prepare_triggers_for_delete_stmt_or_event()) + will_batch= !table->file->start_bulk_delete(); + if (with_select) { if (result->send_result_set_metadata(select_lex->item_list, @@ -683,6 +730,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, else break; } + terminate_delete: killed_status= thd->killed; if (killed_status != NOT_KILLED || thd->is_error()) @@ -769,6 +817,8 @@ cleanup: } delete file_sort; free_underlaid_joins(thd, select_lex); + if (table->file->pushed_cond) + table->file->cond_pop(); DBUG_RETURN(error >= 0 || thd->is_error()); /* Special exits */ @@ -790,7 +840,8 @@ send_nothing_and_leave: delete select; delete file_sort; free_underlaid_joins(thd, select_lex); - //table->set_keyread(false); + if (table->file->pushed_cond) + table->file->cond_pop(); DBUG_ASSERT(!return_error || thd->is_error() || thd->killed); DBUG_RETURN((return_error || thd->is_error() || thd->killed) ? 1 : 0); diff --git a/sql/sql_update.cc b/sql/sql_update.cc index bd577426483..0b6564d8e49 100644 --- a/sql/sql_update.cc +++ b/sql/sql_update.cc @@ -255,11 +255,12 @@ int mysql_update(THD *thd, { bool using_limit= limit != HA_POS_ERROR; bool safe_update= thd->variables.option_bits & OPTION_SAFE_UPDATES; - bool used_key_is_modified= FALSE, transactional_table, will_batch; + bool used_key_is_modified= FALSE, transactional_table; + bool will_batch= FALSE; bool can_compare_record; int res; int error, loc_error; - uint dup_key_found; + ha_rows dup_key_found; bool need_sort= TRUE; bool reverse= FALSE; #ifndef NO_EMBEDDED_ACCESS_CHECKS @@ -276,6 +277,7 @@ int mysql_update(THD *thd, ulonglong id; List<Item> all_fields; killed_state killed_status= NOT_KILLED; + bool has_triggers, binlog_is_row, do_direct_update= FALSE; Update_plan query_plan(thd->mem_root); Explain_update *explain; TABLE_LIST *update_source_table; @@ -287,7 +289,7 @@ int mysql_update(THD *thd, if (open_tables(thd, &table_list, &table_count, 0)) DBUG_RETURN(1); - //Prepare views so they are handled correctly. + /* Prepare views so they are handled correctly */ if (mysql_handle_derived(thd->lex, DT_INIT)) DBUG_RETURN(1); @@ -515,7 +517,6 @@ int mysql_update(THD *thd, query_plan.using_io_buffer= true; } - /* Ok, we have generated a query plan for the UPDATE. - if we're running EXPLAIN UPDATE, goto produce explain output @@ -530,10 +531,68 @@ int mysql_update(THD *thd, DBUG_EXECUTE_IF("show_explain_probe_update_exec_start", dbug_serve_apcs(thd, 1);); - + + has_triggers= (table->triggers && + (table->triggers->has_triggers(TRG_EVENT_UPDATE, + TRG_ACTION_BEFORE) || + table->triggers->has_triggers(TRG_EVENT_UPDATE, + TRG_ACTION_AFTER))); + DBUG_PRINT("info", ("has_triggers: %s", has_triggers ? "TRUE" : "FALSE")); + binlog_is_row= thd->is_current_stmt_binlog_format_row(); + DBUG_PRINT("info", ("binlog_is_row: %s", binlog_is_row ? "TRUE" : "FALSE")); + if (!(select && select->quick)) status_var_increment(thd->status_var.update_scan_count); + /* + We can use direct update (update that is done silently in the handler) + if none of the following conditions are true: + - There are triggers + - There is binary logging + - using_io_buffer + - This means that the partition changed or the key we want + to use for scanning the table is changed + - ignore is set + - Direct updates don't return the number of ignored rows + - There is a virtual not stored column in the WHERE clause + - Changing a field used by a stored virtual column, which + would require the column to be recalculated. + - ORDER BY or LIMIT + - As this requires the rows to be updated in a specific order + - Note that Spider can handle ORDER BY and LIMIT in a cluster with + one data node. These conditions are therefore checked in + direct_update_rows_init(). + + Direct update does not require a WHERE clause + + Later we also ensure that we are only using one table (no sub queries) + */ + if ((table->file->ha_table_flags() & HA_CAN_DIRECT_UPDATE_AND_DELETE) && + !has_triggers && !binlog_is_row && + !query_plan.using_io_buffer && !ignore && + !table->check_virtual_columns_marked_for_read() && + !table->check_virtual_columns_marked_for_write()) + { + DBUG_PRINT("info", ("Trying direct update")); + if (select && select->cond && + (select->cond->used_tables() == table->map)) + { + DBUG_ASSERT(!table->file->pushed_cond); + if (!table->file->cond_push(select->cond)) + table->file->pushed_cond= select->cond; + } + + if (!table->file->info_push(INFO_KIND_UPDATE_FIELDS, &fields) && + !table->file->info_push(INFO_KIND_UPDATE_VALUES, &values) && + !table->file->direct_update_rows_init()) + { + do_direct_update= TRUE; + + /* Direct update is not using_filesort and is not using_io_buffer */ + goto update_begin; + } + } + if (query_plan.using_filesort || query_plan.using_io_buffer) { /* @@ -693,6 +752,7 @@ int mysql_update(THD *thd, } } +update_begin: if (ignore) table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); @@ -712,11 +772,20 @@ int mysql_update(THD *thd, transactional_table= table->file->has_transactions(); thd->abort_on_warning= !ignore && thd->is_strict_mode(); - if (table->prepare_triggers_for_update_stmt_or_event()) + + if (do_direct_update) { - will_batch= FALSE; + /* Direct updating is supported */ + DBUG_PRINT("info", ("Using direct update")); + table->reset_default_fields(); + if (!(error= table->file->ha_direct_update_rows(&updated))) + error= -1; + found= updated; + goto update_end; } - else + + if ((table->file->ha_table_flags() & HA_CAN_FORCE_BULK_UPDATE) && + !table->prepare_triggers_for_update_stmt_or_event()) will_batch= !table->file->start_bulk_update(); /* @@ -801,6 +870,7 @@ int mysql_update(THD *thd, call then it should be included in the count of dup_key_found and error should be set to 0 (only if these errors are ignored). */ + DBUG_PRINT("info", ("Batched update")); error= table->file->ha_bulk_update_row(table->record[1], table->record[0], &dup_key_found); @@ -949,6 +1019,8 @@ int mysql_update(THD *thd, updated-= dup_key_found; if (will_batch) table->file->end_bulk_update(); + +update_end: table->file->try_semi_consistent_read(0); if (!transactional_table && updated > 0) @@ -1004,6 +1076,11 @@ int mysql_update(THD *thd, DBUG_ASSERT(transactional_table || !updated || thd->transaction.stmt.modified_non_trans_table); free_underlaid_joins(thd, select_lex); delete file_sort; + if (table->file->pushed_cond) + { + table->file->pushed_cond= 0; + table->file->cond_pop(); + } /* If LAST_INSERT_ID(X) was used, report X */ id= thd->arg_of_last_insert_id_function ? @@ -1029,7 +1106,6 @@ int mysql_update(THD *thd, *found_return= found; *updated_return= updated; - if (thd->lex->analyze_stmt) goto emit_explain_and_leave; @@ -1040,6 +1116,8 @@ err: delete file_sort; free_underlaid_joins(thd, select_lex); table->file->ha_end_keyread(); + if (table->file->pushed_cond) + table->file->cond_pop(); thd->abort_on_warning= 0; DBUG_RETURN(1); diff --git a/sql/table.cc b/sql/table.cc index 220112f93ed..e8343903d96 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -6631,6 +6631,58 @@ bool TABLE::mark_virtual_columns_for_write(bool insert_fl) DBUG_RETURN(bitmap_updated); } + +/** + Check if a virtual not stored column field is in read set + + @retval FALSE No virtual not stored column is used + @retval TRUE At least one virtual not stored column is used +*/ + +bool TABLE::check_virtual_columns_marked_for_read() +{ + if (vfield) + { + Field **vfield_ptr; + for (vfield_ptr= vfield; *vfield_ptr; vfield_ptr++) + { + Field *tmp_vfield= *vfield_ptr; + if (bitmap_is_set(read_set, tmp_vfield->field_index) && + !tmp_vfield->vcol_info->stored_in_db) + return TRUE; + } + } + return FALSE; +} + + +/** + Check if a stored virtual column field is marked for write + + This can be used to check if any column that is part of a virtual + stored column is changed + + @retval FALSE No stored virtual column is used + @retval TRUE At least one stored virtual column is used +*/ + +bool TABLE::check_virtual_columns_marked_for_write() +{ + if (vfield) + { + Field **vfield_ptr; + for (vfield_ptr= vfield; *vfield_ptr; vfield_ptr++) + { + Field *tmp_vfield= *vfield_ptr; + if (bitmap_is_set(write_set, tmp_vfield->field_index) && + tmp_vfield->vcol_info->stored_in_db) + return TRUE; + } + } + return FALSE; +} + + /* Mark fields used by check constraints. This is done once for the TABLE_SHARE the first time the table is opened. @@ -6688,6 +6740,7 @@ void TABLE::mark_default_fields_for_write(bool is_insert) DBUG_VOID_RETURN; } + void TABLE::move_fields(Field **ptr, const uchar *to, const uchar *from) { my_ptrdiff_t diff= to - from; diff --git a/sql/table.h b/sql/table.h index 19845efe40d..aded1930a6b 100644 --- a/sql/table.h +++ b/sql/table.h @@ -1319,6 +1319,8 @@ public: void mark_columns_per_binlog_row_image(void); bool mark_virtual_col(Field *field); bool mark_virtual_columns_for_write(bool insert_fl); + bool check_virtual_columns_marked_for_read(); + bool check_virtual_columns_marked_for_write(); void mark_default_fields_for_write(bool insert_fl); void mark_columns_used_by_check_constraints(void); void mark_check_constraint_columns_for_read(void); |