diff options
Diffstat (limited to 'sql/ha_partition.cc')
-rw-r--r-- | sql/ha_partition.cc | 850 |
1 files changed, 850 insertions, 0 deletions
diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc index 1e500cf4f37..fb214b90e6e 100644 --- a/sql/ha_partition.cc +++ b/sql/ha_partition.cc @@ -11487,6 +11487,856 @@ void ha_partition::cond_pop() /** + Perform bulk update preparation on each partition. + + SYNOPSIS + start_bulk_update() + + RETURN VALUE + TRUE Error + FALSE Success +*/ + +bool ha_partition::start_bulk_update() +{ + handler **file= m_file; + DBUG_ENTER("ha_partition::start_bulk_update"); + + if (bitmap_is_overlapping(&m_part_info->full_part_field_set, + table->write_set)) + DBUG_RETURN(TRUE); + + do + { + if ((*file)->start_bulk_update()) + DBUG_RETURN(TRUE); + } while (*(++file)); + DBUG_RETURN(FALSE); +} + + +/** + Perform bulk update execution on each partition. A bulk update allows + a handler to batch the updated rows instead of performing the updates + one row at a time. + + SYNOPSIS + exec_bulk_update() + + RETURN VALUE + TRUE Error + FALSE Success +*/ + +int ha_partition::exec_bulk_update(uint *dup_key_found) +{ + int error; + handler **file= m_file; + DBUG_ENTER("ha_partition::exec_bulk_update"); + + do + { + if ((error = (*file)->exec_bulk_update(dup_key_found))) + DBUG_RETURN(error); + } while (*(++file)); + DBUG_RETURN(0); +} + + +/** + Perform bulk update cleanup on each partition. + + SYNOPSIS + end_bulk_update() + + RETURN VALUE + NONE +*/ + +void ha_partition::end_bulk_update() +{ + handler **file= m_file; + DBUG_ENTER("ha_partition::end_bulk_update"); + + do + { + (*file)->end_bulk_update(); + } while (*(++file)); + DBUG_VOID_RETURN; +} + + +/** + Add the row to the bulk update on the partition on which the row is stored. + A bulk update allows a handler to batch the updated rows instead of + performing the updates one row at a time. + + SYNOPSIS + bulk_update_row() + old_data Old record + new_data New record + dup_key_found Number of duplicate keys found + + RETURN VALUE + >1 Error + 1 Bulk update not used, normal operation used + 0 Bulk update used by handler +*/ + +int ha_partition::bulk_update_row(const uchar *old_data, uchar *new_data, + uint *dup_key_found) +{ + int error = 0; + uint32 part_id; + longlong func_value; + my_bitmap_map *old_map; + DBUG_ENTER("ha_partition::bulk_update_row"); + + old_map= dbug_tmp_use_all_columns(table, table->read_set); + error= m_part_info->get_partition_id(m_part_info, &part_id, + &func_value); + dbug_tmp_restore_column_map(table->read_set, old_map); + if (unlikely(error)) + { + m_part_info->err_value= func_value; + goto end; + } + + error = m_file[part_id]->ha_bulk_update_row(old_data, new_data, + dup_key_found); + +end: + DBUG_RETURN(error); +} + + +/** + Perform bulk delete preparation on each partition. + + SYNOPSIS + start_bulk_delete() + + RETURN VALUE + TRUE Error + FALSE Success +*/ + +bool ha_partition::start_bulk_delete() +{ + handler **file= m_file; + DBUG_ENTER("ha_partition::start_bulk_delete"); + + do + { + if ((*file)->start_bulk_delete()) + DBUG_RETURN(TRUE); + } while (*(++file)); + DBUG_RETURN(FALSE); +} + + +/** + Perform bulk delete cleanup on each partition. + + SYNOPSIS + end_bulk_delete() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::end_bulk_delete() +{ + int error= 0; + handler **file= m_file; + DBUG_ENTER("ha_partition::end_bulk_delete"); + + do + { + int tmp; + if ((tmp= (*file)->end_bulk_delete())) + error= tmp; + } while (*(++file)); + DBUG_RETURN(error); +} + + +/** + Perform initialization for a direct update request. + + SYNOPSIS + direct_update_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_update_rows_init() +{ + int error; + uint i, j; + handler *file; + DBUG_ENTER("ha_partition::direct_update_rows_init"); + + if (bitmap_is_overlapping(&m_part_info->full_part_field_set, + table->write_set) +#if defined(HS_HAS_SQLCOM) + && + (thd_sql_command(ha_thd()) != SQLCOM_HS_UPDATE || + check_hs_update_overlapping(&ranges->start_key)) +#endif + ) + { + DBUG_PRINT("info", ("partition FALSE by updating part_key")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + + m_part_spec.start_part= 0; + m_part_spec.end_part= m_tot_parts - 1; + m_direct_update_part_spec = m_part_spec; + + j = 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + file = m_file[i]; + if ((error= file->ha_direct_update_rows_init())) + { + DBUG_PRINT("info", ("partition FALSE by storage engine")); + DBUG_RETURN(error); + } + j++; + } + } + + TABLE_LIST *table_list= table->pos_in_table_list; + if (j != 1 && table_list) + { + while (table_list->parent_l) + table_list = table_list->parent_l; + st_select_lex *select_lex= table_list->select_lex; + DBUG_PRINT("info", ("partition select_lex=%p", select_lex)); + if (select_lex && select_lex->explicit_limit) + { + DBUG_PRINT("info", ("partition explicit_limit=TRUE")); + DBUG_PRINT("info", ("partition offset_limit=%p", + select_lex->offset_limit)); + DBUG_PRINT("info", ("partition select_limit=%p", + select_lex->select_limit)); + DBUG_PRINT("info", ("partition FALSE by select_lex")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + } + DBUG_PRINT("info", ("partition OK")); + DBUG_RETURN(0); +} + + +/** + Do initialization for performing parallel direct update + for a handlersocket update request. + + SYNOPSIS + pre_direct_update_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_update_rows_init() +{ + int error; + uint i, j; + handler *file; + TABLE_LIST *table_list= table->pos_in_table_list; + DBUG_ENTER("ha_partition::pre_direct_update_rows_init"); + + if (bitmap_is_overlapping(&m_part_info->full_part_field_set, + table->write_set) +#if defined(HS_HAS_SQLCOM) + && + (thd_sql_command(ha_thd()) != SQLCOM_HS_UPDATE || + check_hs_update_overlapping(&ranges->start_key)) +#endif + ) + { + DBUG_PRINT("info", ("partition FALSE by updating part_key")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + + m_part_spec.start_part= 0; + m_part_spec.end_part= m_tot_parts - 1; + m_direct_update_part_spec = m_part_spec; + + j = 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + file = m_file[i]; + if ((error= file->ha_pre_direct_update_rows_init())) + { + DBUG_PRINT("info", ("partition FALSE by storage engine")); + DBUG_RETURN(error); + } + j++; + } + } + + if (j != 1 && table_list) + { + while (table_list->parent_l) + table_list = table_list->parent_l; + st_select_lex *select_lex= table_list->select_lex; + DBUG_PRINT("info", ("partition select_lex=%p", select_lex)); + if (select_lex && select_lex->explicit_limit) + { + DBUG_PRINT("info", ("partition explicit_limit=TRUE")); + DBUG_PRINT("info", ("partition offset_limit=%p", + select_lex->offset_limit)); + DBUG_PRINT("info", ("partition select_limit=%p", + select_lex->select_limit)); + DBUG_PRINT("info", ("partition FALSE by select_lex")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + } + if (bulk_access_started) + bulk_access_info_current->called = TRUE; + DBUG_PRINT("info", ("partition OK")); + DBUG_RETURN(0); +} + + +/** + Execute a direct update request. A direct update request updates all + qualified rows in a single operation, rather than one row at a time. + The direct update operation is pushed down to each individual + partition. + + SYNOPSIS + direct_update_rows() + update_rows Number of updated rows + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_update_rows(uint *update_rows) +{ + int error; + bool rnd_seq= FALSE; + uint m_update_rows, i; + handler *file; + DBUG_ENTER("ha_partition::direct_update_rows"); + + if (inited == RND && m_scan_value == 1) + { + rnd_seq= TRUE; + m_scan_value= 2; + } + + *update_rows= 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + file = m_file[i]; + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + if (rnd_seq && file->inited == NONE) + { + if ((error = file->ha_rnd_init(TRUE))) + DBUG_RETURN(error); + } + if ((error= (file)->ha_direct_update_rows(&m_update_rows))) + { + if (rnd_seq) + file->ha_rnd_end(); + DBUG_RETURN(error); + } + *update_rows += m_update_rows; + } + if (rnd_seq && (error = file->ha_index_or_rnd_end())) + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +/** + Start parallel execution of a direct update for a handlersocket update + request. A direct update request updates all qualified rows in a single + operation, rather than one row at a time. The direct update operation + is pushed down to each individual partition. + + SYNOPSIS + pre_direct_update_rows() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_update_rows() +{ + int error; + bool rnd_seq= FALSE; + uint m_update_rows, i; + handler *file; + DBUG_ENTER("ha_partition::pre_direct_update_rows"); + + if (pre_inited == RND && m_scan_value == 1) + { + rnd_seq= TRUE; + m_scan_value= 2; + } + + m_part_spec = m_direct_update_part_spec; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + file = m_file[i]; + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + if (rnd_seq && file->pre_inited == NONE) + { + if ((error = file->ha_pre_rnd_init(TRUE))) + DBUG_RETURN(error); + } + if ((error= (file)->ha_pre_direct_update_rows())) + { + if (rnd_seq) + file->ha_pre_rnd_end(); + DBUG_RETURN(error); + } + bitmap_set_bit(&bulk_access_exec_bitmap, i); + } + if (rnd_seq && (error = file->ha_pre_index_or_rnd_end())) + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +#if defined(HS_HAS_SQLCOM) +/** + Determine whether a key value being updated includes partition columns + when using handlersocket + + SYNOPSIS + check_hs_update_overlapping() + key Key value + + RETURN VALUE + TRUE The key value being updated does not include + partition columns + FALSE The key value being updated does include + partition columns +*/ + +bool ha_partition::check_hs_update_overlapping(key_range *key) +{ + Field *field; + uint store_length, length, var_len, field_index; + const uchar *ptr; + bool key_eq; + KEY *key_info; + KEY_PART_INFO *key_part; + key_part_map tgt_key_part_map = key->keypart_map; + char buf[MAX_FIELD_WIDTH], buf2[MAX_FIELD_WIDTH]; + String tmp_str(buf, MAX_FIELD_WIDTH, &my_charset_bin), *str, + tmp_str2(buf2, MAX_FIELD_WIDTH, &my_charset_bin), *str2; + DBUG_ENTER("ha_partition::check_hs_update_overlapping"); + + key_info = &table->key_info[active_index]; + for (key_part = key_info->key_part, + store_length = key_part->store_length, length = 0; + tgt_key_part_map; + length += store_length, tgt_key_part_map >>= 1, key_part++, + store_length = key_part->store_length) + { + field = key_part->field; + field_index = field->field_index; + if (bitmap_is_set(&m_part_info->full_part_field_set, field_index) && + bitmap_is_set(table->write_set, field_index)) + { + ptr = key->key + length; + key_eq = (tgt_key_part_map > 1); + if (key_part->null_bit && *ptr++) + { + if (key->flag != HA_READ_KEY_EXACT || !field->is_null()) + { + DBUG_PRINT("info", ("spider flag=%u is_null=%s", + key->flag, + field->is_null() ? "TRUE" : "FALSE")); + DBUG_RETURN(TRUE); + } + } + else + { + if (field->type() == MYSQL_TYPE_BLOB || + field->real_type() == MYSQL_TYPE_VARCHAR || + field->type() == MYSQL_TYPE_GEOMETRY) + { + var_len = uint2korr(ptr); + tmp_str.set_quick((char *)ptr + HA_KEY_BLOB_LENGTH, var_len, + &my_charset_bin); + str = &tmp_str; + } + else + str = field->val_str(&tmp_str, ptr); + str2 = field->val_str(&tmp_str2); + if (str->length() != str2->length() || + memcmp(str->ptr(), str2->ptr(), str->length())) + { + DBUG_PRINT("info", ("spider length=%u %u", + str->length(), str2->length())); + DBUG_PRINT("info", ("spider length=%s %s", + str->c_ptr_safe(), str2->c_ptr_safe())); + DBUG_RETURN(TRUE); + } + } + } + } + + +DBUG_PRINT("info", ("partition return FALSE")); + +DBUG_RETURN(FALSE); +} +#endif + + +/** + Perform initialization for a direct delete request. + + SYNOPSIS + direct_delete_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_delete_rows_init() +{ + int error; + uint i, j; + handler *file; + DBUG_ENTER("ha_partition::direct_delete_rows_init"); + + m_part_spec.start_part= 0; + m_part_spec.end_part= m_tot_parts - 1; + m_direct_update_part_spec = m_part_spec; + + j = 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + file = m_file[i]; + if ((error= file->ha_direct_delete_rows_init())) + { + DBUG_PRINT("info", ("partition FALSE by storage engine")); + DBUG_RETURN(error); + } + j++; + } + } + + TABLE_LIST *table_list= table->pos_in_table_list; + if (j != 1 && table_list) + { + while (table_list->parent_l) + table_list = table_list->parent_l; + st_select_lex *select_lex= table_list->select_lex; + DBUG_PRINT("info", ("partition select_lex=%p", select_lex)); + if (select_lex && select_lex->explicit_limit) + { + DBUG_PRINT("info", ("partition explicit_limit=TRUE")); + DBUG_PRINT("info", ("partition offset_limit=%p", + select_lex->offset_limit)); + DBUG_PRINT("info", ("partition select_limit=%p", + select_lex->select_limit)); + DBUG_PRINT("info", ("partition FALSE by select_lex")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + } + DBUG_PRINT("info", ("partition OK")); + DBUG_RETURN(0); +} + + +/** + Do initialization for performing parallel direct delete + for a handlersocket delete request. + + SYNOPSIS + pre_direct_delete_rows_init() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_delete_rows_init() +{ + int error; + uint i, j; + handler *file; + TABLE_LIST *table_list= table->pos_in_table_list; + DBUG_ENTER("ha_partition::pre_direct_delete_rows_init"); + + m_part_spec.start_part= 0; + m_part_spec.end_part= m_tot_parts - 1; + m_direct_update_part_spec = m_part_spec; + + j = 0; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + file = m_file[i]; + if ((error= file->ha_pre_direct_delete_rows_init())) + { + DBUG_PRINT("info", ("partition FALSE by storage engine")); + DBUG_RETURN(error); + } + j++; + } + } + + if (j != 1 && table_list) + { + while (table_list->parent_l) + table_list = table_list->parent_l; + st_select_lex *select_lex= table_list->select_lex; + DBUG_PRINT("info", ("partition select_lex=%p", select_lex)); + if (select_lex && select_lex->explicit_limit) + { + DBUG_PRINT("info", ("partition explicit_limit=TRUE")); + DBUG_PRINT("info", ("partition offset_limit=%p", + select_lex->offset_limit)); + DBUG_PRINT("info", ("partition select_limit=%p", + select_lex->select_limit)); + DBUG_PRINT("info", ("partition FALSE by select_lex")); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); + } + } + if (bulk_access_started) + bulk_access_info_current->called = TRUE; + DBUG_PRINT("info", ("partition OK")); + DBUG_RETURN(0); +} + + +/** + Execute a direct delete request. A direct delete request deletes all + qualified rows in a single operation, rather than one row at a time. + The direct delete operation is pushed down to each individual + partition. + + SYNOPSIS + direct_delete_rows() + delete_rows Number of deleted rows + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::direct_delete_rows(uint *delete_rows) +{ + int error; + bool rnd_seq= FALSE; + uint m_delete_rows, i; + handler *file; + DBUG_ENTER("ha_partition::direct_delete_rows"); + + if (inited == RND && m_scan_value == 1) + { + rnd_seq= TRUE; + m_scan_value= 2; + } + + *delete_rows= 0; + m_part_spec = m_direct_update_part_spec; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + file = m_file[i]; + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + if (rnd_seq && file->inited == NONE) + { + if ((error = file->ha_rnd_init(TRUE))) + DBUG_RETURN(error); + } + if ((error= file->ha_direct_delete_rows(&m_delete_rows))) + { + if (rnd_seq) + file->ha_rnd_end(); + DBUG_RETURN(error); + } + *delete_rows += m_delete_rows; + } + if (rnd_seq && (error = file->ha_index_or_rnd_end())) + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +/** + Start parallel execution of a direct delete for a handlersocket delete + request. A direct delete request deletes all qualified rows in a single + operation, rather than one row at a time. The direct delete operation + is pushed down to each individual partition. + + SYNOPSIS + pre_direct_delete_rows() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::pre_direct_delete_rows() +{ + int error; + bool rnd_seq= FALSE; + uint m_delete_rows, i; + handler *file; + DBUG_ENTER("ha_partition::pre_direct_delete_rows"); + + if (pre_inited == RND && m_scan_value == 1) + { + rnd_seq= TRUE; + m_scan_value= 2; + } + + m_part_spec = m_direct_update_part_spec; + for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++) + { + file = m_file[i]; + if (bitmap_is_set(&(m_part_info->read_partitions), i) && + bitmap_is_set(&(m_part_info->lock_partitions), i)) + { + if (rnd_seq && file->pre_inited == NONE) + { + if ((error = file->ha_pre_rnd_init(TRUE))) + DBUG_RETURN(error); + } + if ((error= file->ha_pre_direct_delete_rows())) + { + if (rnd_seq) + file->ha_pre_rnd_end(); + DBUG_RETURN(error); + } + bitmap_set_bit(&bulk_access_exec_bitmap, i); + } + if (rnd_seq && (error = file->ha_pre_index_or_rnd_end())) + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +/** + Push metadata for the current operation down to each partition. + + SYNOPSIS + info_push() + + RETURN VALUE + >0 Error + 0 Success +*/ + +int ha_partition::info_push(uint info_type, void *info) +{ + int error= 0; + uint i; + handler **file= m_file; + DBUG_ENTER("ha_partition::info_push"); + + switch (info_type) + { + case INFO_KIND_BULK_ACCESS_BEGIN: + DBUG_PRINT("info", ("partition INFO_KIND_BULK_ACCESS_BEGIN")); + if (bulk_access_started) + { + if (!bulk_access_info_current->next) + { + if (!(bulk_access_info_current->next = create_bulk_access_info())) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + bulk_access_info_current->next->sequence_num = + bulk_access_info_current->sequence_num + 1; + } + bulk_access_info_current = bulk_access_info_current->next; + } + else + { + if (!bulk_access_info_first) + { + if (!(bulk_access_info_first = create_bulk_access_info())) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + bulk_access_info_first->sequence_num = 0; + } + bulk_access_info_current = bulk_access_info_first; + bulk_access_started = TRUE; + bulk_access_executing = FALSE; + } + bulk_access_info_current->used = TRUE; + bulk_access_info_current->called = FALSE; + *((void **)info) = bulk_access_info_current; + i = 0; + do + { + int tmp; + if ((tmp = (*file)->info_push(info_type, + &bulk_access_info_current->info[i]))) + error= tmp; + ++i; + } while (*(++file)); + DBUG_RETURN(error); + case INFO_KIND_BULK_ACCESS_CURRENT: + DBUG_PRINT("info", ("partition INFO_KIND_BULK_ACCESS_CURRENT")); + bulk_access_executing = TRUE; + bulk_access_info_exec_tgt = (PARTITION_BULK_ACCESS_INFO *)info; + i = 0; + do + { + int tmp; + if ((tmp = (*file)->info_push(info_type, + bulk_access_info_exec_tgt->info[i]))) + error= tmp; + ++i; + } while (*(++file)); + DBUG_RETURN(error); + case INFO_KIND_BULK_ACCESS_END: + DBUG_PRINT("info", ("partition INFO_KIND_BULK_ACCESS_END")); + bulk_access_started = FALSE; + break; + } + + do + { + int tmp; + if ((tmp= (*file)->info_push(info_type, info))) + error= tmp; + } while (*(++file)); + DBUG_RETURN(error); +} + + +/** Check and set the partition bitmap for partitions involved in an update operation. |