summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKentoku SHIBA <kentokushiba@gmail.com>2017-11-16 11:11:52 +0200
committerMonty <monty@mariadb.org>2017-12-03 13:58:36 +0200
commite53ef202bd7706b88760472472af5ae878065f4f (patch)
treeb4835ec250e0e00d06097da05a28206ca75df75d /sql
parentd1e4ecec07b0611d3d0cbbf53342246ca2c4f600 (diff)
downloadmariadb-git-e53ef202bd7706b88760472472af5ae878065f4f.tar.gz
Adding direct update/delete to the server and to the partition engine.
Add support for direct update and direct delete requests for spider. A direct update/delete request handles all qualified rows in a single operation rather than one row at a time. Contains Spiral patches: 006_mariadb-10.2.0.direct_update_rows.diff MDEV-7704 008_mariadb-10.2.0.partition_direct_update.diff MDEV-7706 010_mariadb-10.2.0.direct_update_rows2.diff MDEV-7708 011_mariadb-10.2.0.aggregate.diff MDEV-7709 027_mariadb-10.2.0.force_bulk_update.diff MDEV-7724 061_mariadb-10.2.0.mariadb-10.1.8.diff MDEV-12870 - The differences compared to the original patches: - Most of the parameters of the new functions are unnecessary. The unnecessary parameters have been removed. - Changed bit positions for new handler flags upon consideration of handler flags not needed by other Spiral patches and handler flags merged from MySQL. - Added info_push() (Was originally part of bulk access patch) - Didn't include code related to handler socket - Added HA_CAN_DIRECT_UPDATE_AND_DELETE Original author: Kentoku SHIBA First reviewer: Jacob Mathew Second reviewer: Michael Widenius
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_partition.cc585
-rw-r--r--sql/ha_partition.h17
-rw-r--r--sql/handler.cc55
-rw-r--r--sql/handler.h121
-rw-r--r--sql/opt_sum.cc3
-rw-r--r--sql/sql_delete.cc75
-rw-r--r--sql/sql_update.cc96
-rw-r--r--sql/table.cc53
-rw-r--r--sql/table.h2
9 files changed, 976 insertions, 31 deletions
diff --git a/sql/ha_partition.cc b/sql/ha_partition.cc
index 972e8820246..0d5793c180b 100644
--- a/sql/ha_partition.cc
+++ b/sql/ha_partition.cc
@@ -6794,6 +6794,7 @@ FT_INFO *ha_partition::ft_init_ext(uint flags, uint inx, String *key)
DBUG_RETURN((FT_INFO*)ft_target);
}
+
/**
Return the next record from the FT result set during an ordered index
pre-scan
@@ -10765,6 +10766,590 @@ void ha_partition::cond_pop()
DBUG_VOID_RETURN;
}
+
+/**
+ Perform bulk update preparation on each partition.
+
+ SYNOPSIS
+ start_bulk_update()
+
+ RETURN VALUE
+ TRUE Error
+ FALSE Success
+*/
+
+bool ha_partition::start_bulk_update()
+{
+ handler **file= m_file;
+ DBUG_ENTER("ha_partition::start_bulk_update");
+
+ if (bitmap_is_overlapping(&m_part_info->full_part_field_set,
+ table->write_set))
+ DBUG_RETURN(TRUE);
+
+ do
+ {
+ if ((*file)->start_bulk_update())
+ DBUG_RETURN(TRUE);
+ } while (*(++file));
+ DBUG_RETURN(FALSE);
+}
+
+
+/**
+ Perform bulk update execution on each partition. A bulk update allows
+ a handler to batch the updated rows instead of performing the updates
+ one row at a time.
+
+ SYNOPSIS
+ exec_bulk_update()
+
+ RETURN VALUE
+ TRUE Error
+ FALSE Success
+*/
+
+int ha_partition::exec_bulk_update(ha_rows *dup_key_found)
+{
+ int error;
+ handler **file= m_file;
+ DBUG_ENTER("ha_partition::exec_bulk_update");
+
+ do
+ {
+ if ((error= (*file)->exec_bulk_update(dup_key_found)))
+ DBUG_RETURN(error);
+ } while (*(++file));
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Perform bulk update cleanup on each partition.
+
+ SYNOPSIS
+ end_bulk_update()
+
+ RETURN VALUE
+ NONE
+*/
+
+int ha_partition::end_bulk_update()
+{
+ int error= 0;
+ handler **file= m_file;
+ DBUG_ENTER("ha_partition::end_bulk_update");
+
+ do
+ {
+ int tmp;
+ if ((tmp= (*file)->end_bulk_update()))
+ error= tmp;
+ } while (*(++file));
+ DBUG_RETURN(error);
+}
+
+
+/**
+ Add the row to the bulk update on the partition on which the row is stored.
+ A bulk update allows a handler to batch the updated rows instead of
+ performing the updates one row at a time.
+
+ SYNOPSIS
+ bulk_update_row()
+ old_data Old record
+ new_data New record
+ dup_key_found Number of duplicate keys found
+
+ RETURN VALUE
+ >1 Error
+ 1 Bulk update not used, normal operation used
+ 0 Bulk update used by handler
+*/
+
+int ha_partition::bulk_update_row(const uchar *old_data, const uchar *new_data,
+ ha_rows *dup_key_found)
+{
+ int error= 0;
+ uint32 part_id;
+ longlong func_value;
+ my_bitmap_map *old_map;
+ DBUG_ENTER("ha_partition::bulk_update_row");
+
+ old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ error= m_part_info->get_partition_id(m_part_info, &part_id,
+ &func_value);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ if (unlikely(error))
+ {
+ m_part_info->err_value= func_value;
+ goto end;
+ }
+
+ error= m_file[part_id]->ha_bulk_update_row(old_data, new_data,
+ dup_key_found);
+
+end:
+ DBUG_RETURN(error);
+}
+
+
+/**
+ Perform bulk delete preparation on each partition.
+
+ SYNOPSIS
+ start_bulk_delete()
+
+ RETURN VALUE
+ TRUE Error
+ FALSE Success
+*/
+
+bool ha_partition::start_bulk_delete()
+{
+ handler **file= m_file;
+ DBUG_ENTER("ha_partition::start_bulk_delete");
+
+ do
+ {
+ if ((*file)->start_bulk_delete())
+ DBUG_RETURN(TRUE);
+ } while (*(++file));
+ DBUG_RETURN(FALSE);
+}
+
+
+/**
+ Perform bulk delete cleanup on each partition.
+
+ SYNOPSIS
+ end_bulk_delete()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::end_bulk_delete()
+{
+ int error= 0;
+ handler **file= m_file;
+ DBUG_ENTER("ha_partition::end_bulk_delete");
+
+ do
+ {
+ int tmp;
+ if ((tmp= (*file)->end_bulk_delete()))
+ error= tmp;
+ } while (*(++file));
+ DBUG_RETURN(error);
+}
+
+
+/**
+ Perform initialization for a direct update request.
+
+ SYNOPSIS
+ direct_update_rows_init()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::direct_update_rows_init()
+{
+ int error;
+ uint i, found;
+ handler *file;
+ DBUG_ENTER("ha_partition::direct_update_rows_init");
+
+ if (bitmap_is_overlapping(&m_part_info->full_part_field_set,
+ table->write_set))
+ {
+ DBUG_PRINT("info", ("partition FALSE by updating part_key"));
+ DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+ }
+
+ m_part_spec.start_part= 0;
+ m_part_spec.end_part= m_tot_parts - 1;
+ m_direct_update_part_spec= m_part_spec;
+
+ found= 0;
+ for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++)
+ {
+ if (bitmap_is_set(&(m_part_info->read_partitions), i) &&
+ bitmap_is_set(&(m_part_info->lock_partitions), i))
+ {
+ file= m_file[i];
+ if ((error= (m_pre_calling ?
+ file->pre_direct_update_rows_init() :
+ file->direct_update_rows_init())))
+ {
+ DBUG_PRINT("info", ("partition FALSE by storage engine"));
+ DBUG_RETURN(error);
+ }
+ found++;
+ }
+ }
+
+ TABLE_LIST *table_list= table->pos_in_table_list;
+ if (found != 1 && table_list)
+ {
+ while (table_list->parent_l)
+ table_list= table_list->parent_l;
+ st_select_lex *select_lex= table_list->select_lex;
+ DBUG_PRINT("info", ("partition select_lex: %p", select_lex));
+ if (select_lex && select_lex->explicit_limit)
+ {
+ DBUG_PRINT("info", ("partition explicit_limit=TRUE"));
+ DBUG_PRINT("info", ("partition offset_limit: %p",
+ select_lex->offset_limit));
+ DBUG_PRINT("info", ("partition select_limit: %p",
+ select_lex->select_limit));
+ DBUG_PRINT("info", ("partition FALSE by select_lex"));
+ DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+ }
+ }
+ DBUG_PRINT("info", ("partition OK"));
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Do initialization for performing parallel direct update
+ for a handlersocket update request.
+
+ SYNOPSIS
+ pre_direct_update_rows_init()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::pre_direct_update_rows_init()
+{
+ bool save_m_pre_calling;
+ int error;
+ DBUG_ENTER("ha_partition::pre_direct_update_rows_init");
+ save_m_pre_calling= m_pre_calling;
+ m_pre_calling= TRUE;
+ error= direct_update_rows_init();
+ m_pre_calling= save_m_pre_calling;
+ DBUG_RETURN(error);
+}
+
+
+/**
+ Execute a direct update request. A direct update request updates all
+ qualified rows in a single operation, rather than one row at a time.
+ The direct update operation is pushed down to each individual
+ partition.
+
+ SYNOPSIS
+ direct_update_rows()
+ update_rows Number of updated rows
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::direct_update_rows(ha_rows *update_rows_result)
+{
+ int error;
+ bool rnd_seq= FALSE;
+ ha_rows update_rows= 0;
+ uint32 i;
+ DBUG_ENTER("ha_partition::direct_update_rows");
+
+ /* If first call to direct_update_rows with RND scan */
+ if ((m_pre_calling ? pre_inited : inited) == RND && m_scan_value == 1)
+ {
+ rnd_seq= TRUE;
+ m_scan_value= 2;
+ }
+
+ *update_rows_result= 0;
+ for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++)
+ {
+ handler *file= m_file[i];
+ if (bitmap_is_set(&(m_part_info->read_partitions), i) &&
+ bitmap_is_set(&(m_part_info->lock_partitions), i))
+ {
+ if (rnd_seq && (m_pre_calling ? file->pre_inited : file->inited) == NONE)
+ {
+ if ((error= (m_pre_calling ?
+ file->ha_pre_rnd_init(TRUE) :
+ file->ha_rnd_init(TRUE))))
+ DBUG_RETURN(error);
+ }
+ if ((error= (m_pre_calling ?
+ (file)->pre_direct_update_rows() :
+ (file)->ha_direct_update_rows(&update_rows))))
+ {
+ if (rnd_seq)
+ {
+ if (m_pre_calling)
+ file->ha_pre_rnd_end();
+ else
+ file->ha_rnd_end();
+ }
+ DBUG_RETURN(error);
+ }
+ *update_rows_result+= update_rows;
+ }
+ if (rnd_seq)
+ {
+ if ((error= (m_pre_calling ?
+ file->ha_pre_index_or_rnd_end() :
+ file->ha_index_or_rnd_end())))
+ DBUG_RETURN(error);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Start parallel execution of a direct update for a handlersocket update
+ request. A direct update request updates all qualified rows in a single
+ operation, rather than one row at a time. The direct update operation
+ is pushed down to each individual partition.
+
+ SYNOPSIS
+ pre_direct_update_rows()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::pre_direct_update_rows()
+{
+ bool save_m_pre_calling;
+ int error;
+ ha_rows not_used= 0;
+ DBUG_ENTER("ha_partition::pre_direct_update_rows");
+ save_m_pre_calling= m_pre_calling;
+ m_pre_calling= TRUE;
+ error= direct_update_rows(&not_used);
+ m_pre_calling= save_m_pre_calling;
+ DBUG_RETURN(error);
+}
+
+
+/**
+ Perform initialization for a direct delete request.
+
+ SYNOPSIS
+ direct_delete_rows_init()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::direct_delete_rows_init()
+{
+ int error;
+ uint i, found;
+ DBUG_ENTER("ha_partition::direct_delete_rows_init");
+
+ m_part_spec.start_part= 0;
+ m_part_spec.end_part= m_tot_parts - 1;
+ m_direct_update_part_spec= m_part_spec;
+
+ found= 0;
+ for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++)
+ {
+ if (bitmap_is_set(&(m_part_info->read_partitions), i) &&
+ bitmap_is_set(&(m_part_info->lock_partitions), i))
+ {
+ handler *file= m_file[i];
+ if ((error= (m_pre_calling ?
+ file->pre_direct_delete_rows_init() :
+ file->direct_delete_rows_init())))
+ {
+ DBUG_PRINT("exit", ("error in direct_delete_rows_init"));
+ DBUG_RETURN(error);
+ }
+ found++;
+ }
+ }
+
+ TABLE_LIST *table_list= table->pos_in_table_list;
+ if (found != 1 && table_list)
+ {
+ while (table_list->parent_l)
+ table_list= table_list->parent_l;
+ st_select_lex *select_lex= table_list->select_lex;
+ DBUG_PRINT("info", ("partition select_lex: %p", select_lex));
+ if (select_lex && select_lex->explicit_limit)
+ {
+ DBUG_PRINT("info", ("partition explicit_limit: TRUE"));
+ DBUG_PRINT("info", ("partition offset_limit: %p",
+ select_lex->offset_limit));
+ DBUG_PRINT("info", ("partition select_limit: %p",
+ select_lex->select_limit));
+ DBUG_PRINT("info", ("partition FALSE by select_lex"));
+ DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+ }
+ }
+ DBUG_PRINT("exit", ("OK"));
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Do initialization for performing parallel direct delete
+ for a handlersocket delete request.
+
+ SYNOPSIS
+ pre_direct_delete_rows_init()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::pre_direct_delete_rows_init()
+{
+ bool save_m_pre_calling;
+ int error;
+ DBUG_ENTER("ha_partition::pre_direct_delete_rows_init");
+ save_m_pre_calling= m_pre_calling;
+ m_pre_calling= TRUE;
+ error= direct_delete_rows_init();
+ m_pre_calling= save_m_pre_calling;
+ DBUG_RETURN(error);
+}
+
+
+/**
+ Execute a direct delete request. A direct delete request deletes all
+ qualified rows in a single operation, rather than one row at a time.
+ The direct delete operation is pushed down to each individual
+ partition.
+
+ SYNOPSIS
+ direct_delete_rows()
+ delete_rows Number of deleted rows
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::direct_delete_rows(ha_rows *delete_rows_result)
+{
+ int error;
+ bool rnd_seq= FALSE;
+ ha_rows delete_rows= 0;
+ uint32 i;
+ handler *file;
+ DBUG_ENTER("ha_partition::direct_delete_rows");
+
+ if ((m_pre_calling ? pre_inited : inited) == RND && m_scan_value == 1)
+ {
+ rnd_seq= TRUE;
+ m_scan_value= 2;
+ }
+
+ *delete_rows_result= 0;
+ m_part_spec= m_direct_update_part_spec;
+ for (i= m_part_spec.start_part; i <= m_part_spec.end_part; i++)
+ {
+ file= m_file[i];
+ if (bitmap_is_set(&(m_part_info->read_partitions), i) &&
+ bitmap_is_set(&(m_part_info->lock_partitions), i))
+ {
+ if (rnd_seq && (m_pre_calling ? file->pre_inited : file->inited) == NONE)
+ {
+ if ((error= (m_pre_calling ?
+ file->ha_pre_rnd_init(TRUE) :
+ file->ha_rnd_init(TRUE))))
+ DBUG_RETURN(error);
+ }
+ if ((error= (m_pre_calling ?
+ file->pre_direct_delete_rows() :
+ file->ha_direct_delete_rows(&delete_rows))))
+ {
+ if (m_pre_calling)
+ file->ha_pre_rnd_end();
+ else
+ file->ha_rnd_end();
+ DBUG_RETURN(error);
+ }
+ delete_rows_result+= delete_rows;
+ }
+ if (rnd_seq)
+ {
+ if ((error= (m_pre_calling ?
+ file->ha_pre_index_or_rnd_end() :
+ file->ha_index_or_rnd_end())))
+ DBUG_RETURN(error);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Start parallel execution of a direct delete for a handlersocket delete
+ request. A direct delete request deletes all qualified rows in a single
+ operation, rather than one row at a time. The direct delete operation
+ is pushed down to each individual partition.
+
+ SYNOPSIS
+ pre_direct_delete_rows()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::pre_direct_delete_rows()
+{
+ bool save_m_pre_calling;
+ int error;
+ ha_rows not_used;
+ DBUG_ENTER("ha_partition::pre_direct_delete_rows");
+ save_m_pre_calling= m_pre_calling;
+ m_pre_calling= TRUE;
+ error= direct_delete_rows(&not_used);
+ m_pre_calling= save_m_pre_calling;
+ DBUG_RETURN(error);
+}
+
+/**
+ Push metadata for the current operation down to each partition.
+
+ SYNOPSIS
+ info_push()
+
+ RETURN VALUE
+ >0 Error
+ 0 Success
+*/
+
+int ha_partition::info_push(uint info_type, void *info)
+{
+ int error= 0;
+ handler **file= m_file;
+ DBUG_ENTER("ha_partition::info_push");
+
+ do
+ {
+ int tmp;
+ if ((tmp= (*file)->info_push(info_type, info)))
+ error= tmp;
+ } while (*(++file));
+ DBUG_RETURN(error);
+}
+
+
void ha_partition::clear_top_table_fields()
{
handler **file;
diff --git a/sql/ha_partition.h b/sql/ha_partition.h
index c636340a5dc..f79a272abad 100644
--- a/sql/ha_partition.h
+++ b/sql/ha_partition.h
@@ -296,6 +296,7 @@ private:
ha_rows m_bulk_inserted_rows;
/** used for prediction of start_bulk_insert rows */
enum_monotonicity_info m_part_func_monotonicity_info;
+ part_id_range m_direct_update_part_spec;
bool m_pre_calling;
bool m_pre_call_use_parallel;
/* Keep track of bulk access requests */
@@ -535,8 +536,23 @@ public:
number of calls to write_row.
*/
virtual int write_row(uchar * buf);
+ virtual bool start_bulk_update();
+ virtual int exec_bulk_update(ha_rows *dup_key_found);
+ virtual int end_bulk_update();
+ virtual int bulk_update_row(const uchar *old_data, const uchar *new_data,
+ ha_rows *dup_key_found);
virtual int update_row(const uchar * old_data, const uchar * new_data);
+ virtual int direct_update_rows_init();
+ virtual int pre_direct_update_rows_init();
+ virtual int direct_update_rows(ha_rows *update_rows);
+ virtual int pre_direct_update_rows();
+ virtual bool start_bulk_delete();
+ virtual int end_bulk_delete();
virtual int delete_row(const uchar * buf);
+ virtual int direct_delete_rows_init();
+ virtual int pre_direct_delete_rows_init();
+ virtual int direct_delete_rows(ha_rows *delete_rows);
+ virtual int pre_direct_delete_rows();
virtual int delete_all_rows(void);
virtual int truncate();
virtual void start_bulk_insert(ha_rows rows, uint flags);
@@ -1306,6 +1322,7 @@ public:
virtual const COND *cond_push(const COND *cond);
virtual void cond_pop();
virtual void clear_top_table_fields();
+ virtual int info_push(uint info_type, void *info);
private:
int handle_opt_partitions(THD *thd, HA_CHECK_OPT *check_opt, uint flags);
diff --git a/sql/handler.cc b/sql/handler.cc
index a215c13e718..4e059e0e56c 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -4058,7 +4058,7 @@ int handler::ha_repair(THD* thd, HA_CHECK_OPT* check_opt)
int
handler::ha_bulk_update_row(const uchar *old_data, const uchar *new_data,
- uint *dup_key_found)
+ ha_rows *dup_key_found)
{
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type == F_WRLCK);
@@ -6163,6 +6163,59 @@ int handler::ha_delete_row(const uchar *buf)
}
+/**
+ Execute a direct update request. A direct update request updates all
+ qualified rows in a single operation, rather than one row at a time.
+ In a Spider cluster the direct update operation is pushed down to the
+ child levels of the cluster.
+
+ Note that this can't be used in case of statment logging
+
+ @param update_rows Number of updated rows.
+
+ @retval 0 Success.
+ @retval != 0 Failure.
+*/
+
+int handler::ha_direct_update_rows(ha_rows *update_rows)
+{
+ int error;
+
+ MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
+ mark_trx_read_write();
+
+ error = direct_update_rows(update_rows);
+ MYSQL_UPDATE_ROW_DONE(error);
+ return error;
+}
+
+
+/**
+ Execute a direct delete request. A direct delete request deletes all
+ qualified rows in a single operation, rather than one row at a time.
+ In a Spider cluster the direct delete operation is pushed down to the
+ child levels of the cluster.
+
+ @param delete_rows Number of deleted rows.
+
+ @retval 0 Success.
+ @retval != 0 Failure.
+*/
+
+int handler::ha_direct_delete_rows(ha_rows *delete_rows)
+{
+ int error;
+ /* Ensure we are not using binlog row */
+ DBUG_ASSERT(!table->in_use->is_current_stmt_binlog_format_row());
+
+ MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str);
+ mark_trx_read_write();
+
+ error = direct_delete_rows(delete_rows);
+ MYSQL_DELETE_ROW_DONE(error);
+ return error;
+}
+
/** @brief
use_hidden_primary_key() is called in case of an update/delete when
diff --git a/sql/handler.h b/sql/handler.h
index cbc30872abf..a176b45d8db 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -287,6 +287,11 @@ enum enum_alter_inplace_result {
*/
#define HA_BINLOG_FLAGS (HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE)
+/* The following are used by Spider */
+#define HA_CAN_FORCE_BULK_UPDATE (1ULL << 50)
+#define HA_CAN_FORCE_BULK_DELETE (1ULL << 51)
+#define HA_CAN_DIRECT_UPDATE_AND_DELETE (1ULL << 52)
+
/* The following is for partition handler */
#define HA_CAN_MULTISTEP_MERGE (1LL << 53)
@@ -442,6 +447,12 @@ static const uint MYSQL_START_TRANS_OPT_READ_WRITE = 4;
#define HA_CHECK_DUP (HA_CHECK_DUP_KEY + HA_CHECK_DUP_UNIQUE)
#define HA_CHECK_ALL (~0U)
+/* Options for info_push() */
+#define INFO_KIND_UPDATE_FIELDS 101
+#define INFO_KIND_UPDATE_VALUES 102
+#define INFO_KIND_FORCE_LIMIT_BEGIN 103
+#define INFO_KIND_FORCE_LIMIT_END 104
+
enum legacy_db_type
{
/* note these numerical values are fixed and can *not* be changed */
@@ -1420,9 +1431,6 @@ handlerton *ha_default_tmp_handlerton(THD *thd);
#define HTON_TEMPORARY_NOT_SUPPORTED (1 << 6) //Having temporary tables not supported
#define HTON_SUPPORT_LOG_TABLES (1 << 7) //Engine supports log tables
#define HTON_NO_PARTITION (1 << 8) //Not partition of these tables
-#define HTON_CAN_MULTISTEP_MERGE (1 << 9) //You can merge mearged tables
-// Engine needs to access the main connect string in partitions
-#define HTON_CAN_READ_CONNECT_STRING_IN_PARTITION (1 << 10)
/*
This flag should be set when deciding that the engine does not allow
@@ -1443,6 +1451,10 @@ handlerton *ha_default_tmp_handlerton(THD *thd);
// MySQL compatibility. Unused.
#define HTON_SUPPORTS_FOREIGN_KEYS (1 << 0) //Foreign key constraint supported.
+#define HTON_CAN_MERGE (1 <<11) //Merge type table
+// Engine needs to access the main connect string in partitions
+#define HTON_CAN_READ_CONNECT_STRING_IN_PARTITION (1 <<12)
+
class Ha_trx_info;
struct THD_TRANS
@@ -2718,7 +2730,8 @@ public:
/** Length of ref (1-8 or the clustered key length) */
uint ref_length;
FT_INFO *ft_handler;
- enum {NONE=0, INDEX, RND} inited;
+ enum init_stat { NONE=0, INDEX, RND };
+ init_stat inited, pre_inited;
const COND *pushed_cond;
/**
@@ -2817,7 +2830,7 @@ public:
key_used_on_scan(MAX_KEY),
active_index(MAX_KEY), keyread(MAX_KEY),
ref_length(sizeof(my_off_t)),
- ft_handler(0), inited(NONE),
+ ft_handler(0), inited(NONE), pre_inited(NONE),
pushed_cond(0), next_insert_id(0), insert_id_for_cur_row(0),
tracker(NULL),
pushed_idx_cond(NULL),
@@ -2958,7 +2971,7 @@ public:
DBUG_RETURN(ret);
}
int ha_bulk_update_row(const uchar *old_data, const uchar *new_data,
- uint *dup_key_found);
+ ha_rows *dup_key_found);
int ha_delete_all_rows();
int ha_truncate();
int ha_reset_auto_increment(ulonglong value);
@@ -3156,7 +3169,7 @@ public:
@retval 0 Success
@retval >0 Error code
*/
- virtual int exec_bulk_update(uint *dup_key_found)
+ virtual int exec_bulk_update(ha_rows *dup_key_found)
{
DBUG_ASSERT(FALSE);
return HA_ERR_WRONG_COMMAND;
@@ -3165,7 +3178,7 @@ public:
Perform any needed clean-up, no outstanding updates are there at the
moment.
*/
- virtual void end_bulk_update() { return; }
+ virtual int end_bulk_update() { return 0; }
/**
Execute all outstanding deletes and close down the bulk delete.
@@ -3208,6 +3221,48 @@ public:
{ return 0; }
virtual int pre_rnd_next(bool use_parallel)
{ return 0; }
+ int ha_pre_rnd_init(bool scan)
+ {
+ int result;
+ DBUG_ENTER("ha_pre_rnd_init");
+ DBUG_ASSERT(pre_inited==NONE || (pre_inited==RND && scan));
+ pre_inited= (result= pre_rnd_init(scan)) ? NONE: RND;
+ DBUG_RETURN(result);
+ }
+ int ha_pre_rnd_end()
+ {
+ DBUG_ENTER("ha_pre_rnd_end");
+ DBUG_ASSERT(pre_inited==RND);
+ pre_inited=NONE;
+ DBUG_RETURN(pre_rnd_end());
+ }
+ virtual int pre_rnd_init(bool scan) { return 0; }
+ virtual int pre_rnd_end() { return 0; }
+ virtual int pre_index_init(uint idx, bool sorted) { return 0; }
+ virtual int pre_index_end() { return 0; }
+ int ha_pre_index_init(uint idx, bool sorted)
+ {
+ int result;
+ DBUG_ENTER("ha_pre_index_init");
+ DBUG_ASSERT(pre_inited==NONE);
+ if (!(result= pre_index_init(idx, sorted)))
+ pre_inited=INDEX;
+ DBUG_RETURN(result);
+ }
+ int ha_pre_index_end()
+ {
+ DBUG_ENTER("ha_pre_index_end");
+ DBUG_ASSERT(pre_inited==INDEX);
+ pre_inited=NONE;
+ DBUG_RETURN(pre_index_end());
+ }
+ int ha_pre_index_or_rnd_end()
+ {
+ return (pre_inited == INDEX ?
+ ha_pre_index_end() :
+ pre_inited == RND ? ha_pre_rnd_end() : 0 );
+ }
+
/**
@brief
Positions an index cursor to the index specified in the
@@ -3713,6 +3768,11 @@ public:
virtual void cond_pop() { return; };
/**
+ Push metadata for the current operation down to the table handler.
+ */
+ virtual int info_push(uint info_type, void *info) { return 0; };
+
+ /**
This function is used to get correlating of a parent (table/column)
and children (table/column). When conditions are pushed down to child
table (like child of myisam_merge), child table needs to know about
@@ -4162,6 +4222,49 @@ private:
{
return HA_ERR_WRONG_COMMAND;
}
+
+ /* Perform initialization for a direct update request */
+public:
+ int ha_direct_update_rows(ha_rows *update_rows);
+ virtual int direct_update_rows_init()
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+private:
+ virtual int pre_direct_update_rows_init()
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+ virtual int direct_update_rows(ha_rows *update_rows __attribute__((unused)))
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+ virtual int pre_direct_update_rows()
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+
+ /* Perform initialization for a direct delete request */
+public:
+ int ha_direct_delete_rows(ha_rows *delete_rows);
+ virtual int direct_delete_rows_init()
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+private:
+ virtual int pre_direct_delete_rows_init()
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+ virtual int direct_delete_rows(ha_rows *delete_rows __attribute__((unused)))
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+ virtual int pre_direct_delete_rows()
+ {
+ return HA_ERR_WRONG_COMMAND;
+ }
+
/**
Reset state of file to after 'open'.
This function is called after every statement for all tables used
@@ -4236,7 +4339,7 @@ public:
@retval 1 Bulk delete not used, normal operation used
*/
virtual int bulk_update_row(const uchar *old_data, const uchar *new_data,
- uint *dup_key_found)
+ ha_rows *dup_key_found)
{
DBUG_ASSERT(FALSE);
return HA_ERR_WRONG_COMMAND;
diff --git a/sql/opt_sum.cc b/sql/opt_sum.cc
index 72b28473e15..43d1c2de7ad 100644
--- a/sql/opt_sum.cc
+++ b/sql/opt_sum.cc
@@ -398,6 +398,8 @@ int opt_sum_query(THD *thd,
const_result= 0;
break;
}
+ longlong info_limit= 1;
+ table->file->info_push(INFO_KIND_FORCE_LIMIT_BEGIN, &info_limit);
if (!(error= table->file->ha_index_init((uint) ref.key, 1)))
error= (is_max ?
get_index_max_value(table, &ref, range_fl) :
@@ -410,6 +412,7 @@ int opt_sum_query(THD *thd,
error= HA_ERR_KEY_NOT_FOUND;
table->file->ha_end_keyread();
table->file->ha_index_end();
+ table->file->info_push(INFO_KIND_FORCE_LIMIT_END, NULL);
if (error)
{
if (error == HA_ERR_KEY_NOT_FOUND || error == HA_ERR_END_OF_FILE)
diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc
index 0d4bed836b8..984a0799b54 100644
--- a/sql/sql_delete.cc
+++ b/sql/sql_delete.cc
@@ -250,7 +250,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
SQL_I_List<ORDER> *order_list, ha_rows limit,
ulonglong options, select_result *result)
{
- bool will_batch;
+ bool will_batch= FALSE;
int error, loc_error;
TABLE *table;
SQL_SELECT *select=0;
@@ -262,11 +262,13 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
bool return_error= 0;
ha_rows deleted= 0;
bool reverse= FALSE;
+ bool has_triggers;
ORDER *order= (ORDER *) ((order_list && order_list->elements) ?
order_list->first : NULL);
SELECT_LEX *select_lex= &thd->lex->select_lex;
killed_state killed_status= NOT_KILLED;
THD::enum_binlog_query_type query_type= THD::ROW_QUERY_TYPE;
+ bool binlog_is_row;
bool with_select= !select_lex->item_list.is_empty();
Explain_delete *explain;
Delete_plan query_plan(thd->mem_root);
@@ -371,9 +373,12 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
- We should not be binlogging this statement in row-based, and
- there should be no delete triggers associated with the table.
*/
+
+ has_triggers= (table->triggers &&
+ table->triggers->has_delete_triggers());
if (!with_select && !using_limit && const_cond_result &&
(!thd->is_current_stmt_binlog_format_row() &&
- !(table->triggers && table->triggers->has_delete_triggers())))
+ !has_triggers))
{
/* Update the table->file->stats.records number */
table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
@@ -522,14 +527,59 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
if (!(select && select->quick))
status_var_increment(thd->status_var.delete_scan_count);
- if (query_plan.using_filesort)
+ binlog_is_row= thd->is_current_stmt_binlog_format_row();
+ DBUG_PRINT("info", ("binlog_is_row: %s", binlog_is_row ? "TRUE" : "FALSE"));
+
+ /*
+ We can use direct delete (delete that is done silently in the handler)
+ if none of the following conditions are true:
+ - There are triggers
+ - There is binary logging
+ - There is a virtual not stored column in the WHERE clause
+ - ORDER BY or LIMIT
+ - As this requires the rows to be deleted in a specific order
+ - Note that Spider can handle ORDER BY and LIMIT in a cluster with
+ one data node. These conditions are therefore checked in
+ direct_delete_rows_init().
+
+ Direct delete does not require a WHERE clause
+
+ Later we also ensure that we are only using one table (no sub queries)
+ */
+
+ if ((table->file->ha_table_flags() & HA_CAN_DIRECT_UPDATE_AND_DELETE) &&
+ !has_triggers && !binlog_is_row && !with_select)
{
+ table->mark_columns_needed_for_delete();
+ if (!table->check_virtual_columns_marked_for_read())
+ {
+ DBUG_PRINT("info", ("Trying direct delete"));
+ if (select && select->cond &&
+ (select->cond->used_tables() == table->map))
+ {
+ DBUG_ASSERT(!table->file->pushed_cond);
+ if (!table->file->cond_push(select->cond))
+ table->file->pushed_cond= select->cond;
+ }
+ if (!table->file->direct_delete_rows_init())
+ {
+ /* Direct deleting is supported */
+ DBUG_PRINT("info", ("Using direct delete"));
+ THD_STAGE_INFO(thd, stage_updating);
+ if (!(error= table->file->ha_direct_delete_rows(&deleted)))
+ error= -1;
+ goto terminate_delete;
+ }
+ }
+ }
+ if (query_plan.using_filesort)
+ {
{
Filesort fsort(order, HA_POS_ERROR, true, select);
DBUG_ASSERT(query_plan.index == MAX_KEY);
- Filesort_tracker *fs_tracker=
+ Filesort_tracker *fs_tracker=
thd->lex->explain->get_upd_del_plan()->filesort_tracker;
if (!(file_sort= filesort(thd, table, &fsort, fs_tracker)))
@@ -568,15 +618,12 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
if (init_ftfuncs(thd, select_lex, 1))
goto got_error;
- if (table->prepare_triggers_for_delete_stmt_or_event())
- {
- will_batch= FALSE;
- }
- else
- will_batch= !table->file->start_bulk_delete();
-
table->mark_columns_needed_for_delete();
+ if ((table->file->ha_table_flags() & HA_CAN_FORCE_BULK_DELETE) &&
+ !table->prepare_triggers_for_delete_stmt_or_event())
+ will_batch= !table->file->start_bulk_delete();
+
if (with_select)
{
if (result->send_result_set_metadata(select_lex->item_list,
@@ -683,6 +730,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
else
break;
}
+
terminate_delete:
killed_status= thd->killed;
if (killed_status != NOT_KILLED || thd->is_error())
@@ -769,6 +817,8 @@ cleanup:
}
delete file_sort;
free_underlaid_joins(thd, select_lex);
+ if (table->file->pushed_cond)
+ table->file->cond_pop();
DBUG_RETURN(error >= 0 || thd->is_error());
/* Special exits */
@@ -790,7 +840,8 @@ send_nothing_and_leave:
delete select;
delete file_sort;
free_underlaid_joins(thd, select_lex);
- //table->set_keyread(false);
+ if (table->file->pushed_cond)
+ table->file->cond_pop();
DBUG_ASSERT(!return_error || thd->is_error() || thd->killed);
DBUG_RETURN((return_error || thd->is_error() || thd->killed) ? 1 : 0);
diff --git a/sql/sql_update.cc b/sql/sql_update.cc
index bd577426483..0b6564d8e49 100644
--- a/sql/sql_update.cc
+++ b/sql/sql_update.cc
@@ -255,11 +255,12 @@ int mysql_update(THD *thd,
{
bool using_limit= limit != HA_POS_ERROR;
bool safe_update= thd->variables.option_bits & OPTION_SAFE_UPDATES;
- bool used_key_is_modified= FALSE, transactional_table, will_batch;
+ bool used_key_is_modified= FALSE, transactional_table;
+ bool will_batch= FALSE;
bool can_compare_record;
int res;
int error, loc_error;
- uint dup_key_found;
+ ha_rows dup_key_found;
bool need_sort= TRUE;
bool reverse= FALSE;
#ifndef NO_EMBEDDED_ACCESS_CHECKS
@@ -276,6 +277,7 @@ int mysql_update(THD *thd,
ulonglong id;
List<Item> all_fields;
killed_state killed_status= NOT_KILLED;
+ bool has_triggers, binlog_is_row, do_direct_update= FALSE;
Update_plan query_plan(thd->mem_root);
Explain_update *explain;
TABLE_LIST *update_source_table;
@@ -287,7 +289,7 @@ int mysql_update(THD *thd,
if (open_tables(thd, &table_list, &table_count, 0))
DBUG_RETURN(1);
- //Prepare views so they are handled correctly.
+ /* Prepare views so they are handled correctly */
if (mysql_handle_derived(thd->lex, DT_INIT))
DBUG_RETURN(1);
@@ -515,7 +517,6 @@ int mysql_update(THD *thd,
query_plan.using_io_buffer= true;
}
-
/*
Ok, we have generated a query plan for the UPDATE.
- if we're running EXPLAIN UPDATE, goto produce explain output
@@ -530,10 +531,68 @@ int mysql_update(THD *thd,
DBUG_EXECUTE_IF("show_explain_probe_update_exec_start",
dbug_serve_apcs(thd, 1););
-
+
+ has_triggers= (table->triggers &&
+ (table->triggers->has_triggers(TRG_EVENT_UPDATE,
+ TRG_ACTION_BEFORE) ||
+ table->triggers->has_triggers(TRG_EVENT_UPDATE,
+ TRG_ACTION_AFTER)));
+ DBUG_PRINT("info", ("has_triggers: %s", has_triggers ? "TRUE" : "FALSE"));
+ binlog_is_row= thd->is_current_stmt_binlog_format_row();
+ DBUG_PRINT("info", ("binlog_is_row: %s", binlog_is_row ? "TRUE" : "FALSE"));
+
if (!(select && select->quick))
status_var_increment(thd->status_var.update_scan_count);
+ /*
+ We can use direct update (update that is done silently in the handler)
+ if none of the following conditions are true:
+ - There are triggers
+ - There is binary logging
+ - using_io_buffer
+ - This means that the partition changed or the key we want
+ to use for scanning the table is changed
+ - ignore is set
+ - Direct updates don't return the number of ignored rows
+ - There is a virtual not stored column in the WHERE clause
+ - Changing a field used by a stored virtual column, which
+ would require the column to be recalculated.
+ - ORDER BY or LIMIT
+ - As this requires the rows to be updated in a specific order
+ - Note that Spider can handle ORDER BY and LIMIT in a cluster with
+ one data node. These conditions are therefore checked in
+ direct_update_rows_init().
+
+ Direct update does not require a WHERE clause
+
+ Later we also ensure that we are only using one table (no sub queries)
+ */
+ if ((table->file->ha_table_flags() & HA_CAN_DIRECT_UPDATE_AND_DELETE) &&
+ !has_triggers && !binlog_is_row &&
+ !query_plan.using_io_buffer && !ignore &&
+ !table->check_virtual_columns_marked_for_read() &&
+ !table->check_virtual_columns_marked_for_write())
+ {
+ DBUG_PRINT("info", ("Trying direct update"));
+ if (select && select->cond &&
+ (select->cond->used_tables() == table->map))
+ {
+ DBUG_ASSERT(!table->file->pushed_cond);
+ if (!table->file->cond_push(select->cond))
+ table->file->pushed_cond= select->cond;
+ }
+
+ if (!table->file->info_push(INFO_KIND_UPDATE_FIELDS, &fields) &&
+ !table->file->info_push(INFO_KIND_UPDATE_VALUES, &values) &&
+ !table->file->direct_update_rows_init())
+ {
+ do_direct_update= TRUE;
+
+ /* Direct update is not using_filesort and is not using_io_buffer */
+ goto update_begin;
+ }
+ }
+
if (query_plan.using_filesort || query_plan.using_io_buffer)
{
/*
@@ -693,6 +752,7 @@ int mysql_update(THD *thd,
}
}
+update_begin:
if (ignore)
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
@@ -712,11 +772,20 @@ int mysql_update(THD *thd,
transactional_table= table->file->has_transactions();
thd->abort_on_warning= !ignore && thd->is_strict_mode();
- if (table->prepare_triggers_for_update_stmt_or_event())
+
+ if (do_direct_update)
{
- will_batch= FALSE;
+ /* Direct updating is supported */
+ DBUG_PRINT("info", ("Using direct update"));
+ table->reset_default_fields();
+ if (!(error= table->file->ha_direct_update_rows(&updated)))
+ error= -1;
+ found= updated;
+ goto update_end;
}
- else
+
+ if ((table->file->ha_table_flags() & HA_CAN_FORCE_BULK_UPDATE) &&
+ !table->prepare_triggers_for_update_stmt_or_event())
will_batch= !table->file->start_bulk_update();
/*
@@ -801,6 +870,7 @@ int mysql_update(THD *thd,
call then it should be included in the count of dup_key_found
and error should be set to 0 (only if these errors are ignored).
*/
+ DBUG_PRINT("info", ("Batched update"));
error= table->file->ha_bulk_update_row(table->record[1],
table->record[0],
&dup_key_found);
@@ -949,6 +1019,8 @@ int mysql_update(THD *thd,
updated-= dup_key_found;
if (will_batch)
table->file->end_bulk_update();
+
+update_end:
table->file->try_semi_consistent_read(0);
if (!transactional_table && updated > 0)
@@ -1004,6 +1076,11 @@ int mysql_update(THD *thd,
DBUG_ASSERT(transactional_table || !updated || thd->transaction.stmt.modified_non_trans_table);
free_underlaid_joins(thd, select_lex);
delete file_sort;
+ if (table->file->pushed_cond)
+ {
+ table->file->pushed_cond= 0;
+ table->file->cond_pop();
+ }
/* If LAST_INSERT_ID(X) was used, report X */
id= thd->arg_of_last_insert_id_function ?
@@ -1029,7 +1106,6 @@ int mysql_update(THD *thd,
*found_return= found;
*updated_return= updated;
-
if (thd->lex->analyze_stmt)
goto emit_explain_and_leave;
@@ -1040,6 +1116,8 @@ err:
delete file_sort;
free_underlaid_joins(thd, select_lex);
table->file->ha_end_keyread();
+ if (table->file->pushed_cond)
+ table->file->cond_pop();
thd->abort_on_warning= 0;
DBUG_RETURN(1);
diff --git a/sql/table.cc b/sql/table.cc
index 220112f93ed..e8343903d96 100644
--- a/sql/table.cc
+++ b/sql/table.cc
@@ -6631,6 +6631,58 @@ bool TABLE::mark_virtual_columns_for_write(bool insert_fl)
DBUG_RETURN(bitmap_updated);
}
+
+/**
+ Check if a virtual not stored column field is in read set
+
+ @retval FALSE No virtual not stored column is used
+ @retval TRUE At least one virtual not stored column is used
+*/
+
+bool TABLE::check_virtual_columns_marked_for_read()
+{
+ if (vfield)
+ {
+ Field **vfield_ptr;
+ for (vfield_ptr= vfield; *vfield_ptr; vfield_ptr++)
+ {
+ Field *tmp_vfield= *vfield_ptr;
+ if (bitmap_is_set(read_set, tmp_vfield->field_index) &&
+ !tmp_vfield->vcol_info->stored_in_db)
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
+
+/**
+ Check if a stored virtual column field is marked for write
+
+ This can be used to check if any column that is part of a virtual
+ stored column is changed
+
+ @retval FALSE No stored virtual column is used
+ @retval TRUE At least one stored virtual column is used
+*/
+
+bool TABLE::check_virtual_columns_marked_for_write()
+{
+ if (vfield)
+ {
+ Field **vfield_ptr;
+ for (vfield_ptr= vfield; *vfield_ptr; vfield_ptr++)
+ {
+ Field *tmp_vfield= *vfield_ptr;
+ if (bitmap_is_set(write_set, tmp_vfield->field_index) &&
+ tmp_vfield->vcol_info->stored_in_db)
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
+
/*
Mark fields used by check constraints.
This is done once for the TABLE_SHARE the first time the table is opened.
@@ -6688,6 +6740,7 @@ void TABLE::mark_default_fields_for_write(bool is_insert)
DBUG_VOID_RETURN;
}
+
void TABLE::move_fields(Field **ptr, const uchar *to, const uchar *from)
{
my_ptrdiff_t diff= to - from;
diff --git a/sql/table.h b/sql/table.h
index 19845efe40d..aded1930a6b 100644
--- a/sql/table.h
+++ b/sql/table.h
@@ -1319,6 +1319,8 @@ public:
void mark_columns_per_binlog_row_image(void);
bool mark_virtual_col(Field *field);
bool mark_virtual_columns_for_write(bool insert_fl);
+ bool check_virtual_columns_marked_for_read();
+ bool check_virtual_columns_marked_for_write();
void mark_default_fields_for_write(bool insert_fl);
void mark_columns_used_by_check_constraints(void);
void mark_check_constraint_columns_for_read(void);