diff options
Diffstat (limited to 'sql/handler.cc')
-rw-r--r-- | sql/handler.cc | 1272 |
1 files changed, 1089 insertions, 183 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index d739ce7cd5b..6bd8938fdca 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1,5 +1,5 @@ /* Copyright (c) 2000, 2016, Oracle and/or its affiliates. - Copyright (c) 2009, 2018, MariaDB Corporation. + Copyright (c) 2009, 2019, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -20,11 +20,11 @@ Handler-calling-functions */ -#include <my_global.h> +#include "mariadb.h" #include <inttypes.h> #include "sql_priv.h" #include "unireg.h" -#include "rpl_handler.h" +#include "rpl_rli.h" #include "sql_cache.h" // query_cache, query_cache_* #include "sql_connect.h" // global_table_stats #include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp @@ -42,6 +42,7 @@ #include <mysql/psi/mysql_table.h> #include "debug_sync.h" // DEBUG_SYNC #include "sql_audit.h" +#include "ha_sequence.h" #ifdef WITH_PARTITION_STORAGE_ENGINE #include "ha_partition.h" @@ -50,6 +51,7 @@ #ifdef WITH_ARIA_STORAGE_ENGINE #include "../storage/maria/ha_maria.h" #endif +#include "semisync_master.h" #include "wsrep_mysqld.h" #include "wsrep.h" @@ -67,13 +69,13 @@ static handlerton *installed_htons[128]; #define BITMAP_STACKBUF_SIZE (128/8) KEY_CREATE_INFO default_key_create_info= -{ HA_KEY_ALG_UNDEF, 0, {NullS, 0}, {NullS, 0}, true }; +{ HA_KEY_ALG_UNDEF, 0, 0, {NullS, 0}, {NullS, 0}, true }; /* number of entries in handlertons[] */ ulong total_ha= 0; /* number of storage engines (from handlertons[]) that support 2pc */ ulong total_ha_2pc= 0; -#ifndef DBUG_OFF +#ifdef DBUG_ASSERT_EXISTS /* Number of non-mandatory 2pc handlertons whose initialization failed to estimate total_ha_2pc value under supposition of the failures @@ -84,12 +86,12 @@ ulong failed_ha_2pc= 0; /* size of savepoint storage area (see ha_init) */ ulong savepoint_alloc_size= 0; -static const LEX_STRING sys_table_aliases[]= +static const LEX_CSTRING sys_table_aliases[]= { - { C_STRING_WITH_LEN("INNOBASE") }, { C_STRING_WITH_LEN("INNODB") }, - { C_STRING_WITH_LEN("HEAP") }, { C_STRING_WITH_LEN("MEMORY") }, - { C_STRING_WITH_LEN("MERGE") }, { C_STRING_WITH_LEN("MRG_MYISAM") }, - { C_STRING_WITH_LEN("Maria") }, { C_STRING_WITH_LEN("Aria") }, + { STRING_WITH_LEN("INNOBASE") }, { STRING_WITH_LEN("INNODB") }, + { STRING_WITH_LEN("HEAP") }, { STRING_WITH_LEN("MEMORY") }, + { STRING_WITH_LEN("MERGE") }, { STRING_WITH_LEN("MRG_MYISAM") }, + { STRING_WITH_LEN("Maria") }, { STRING_WITH_LEN("Aria") }, {NullS, 0} }; @@ -168,9 +170,10 @@ handlerton *ha_default_tmp_handlerton(THD *thd) RETURN pointer to storage engine plugin handle */ -plugin_ref ha_resolve_by_name(THD *thd, const LEX_STRING *name, bool tmp_table) +plugin_ref ha_resolve_by_name(THD *thd, const LEX_CSTRING *name, + bool tmp_table) { - const LEX_STRING *table_alias; + const LEX_CSTRING *table_alias; plugin_ref plugin; redo: @@ -215,16 +218,8 @@ Storage_engine_name::resolve_storage_engine_with_error(THD *thd, handlerton **ha, bool tmp_table) { -#if MYSQL_VERSION_ID < 100300 - /* - Please remove tmp_name when merging to 10.3 and pass m_storage_engine_name - directly to ha_resolve_by_name(). - */ - LEX_STRING tmp_name; - tmp_name.str= const_cast<char*>(m_storage_engine_name.str); - tmp_name.length= m_storage_engine_name.length; -#endif - if (plugin_ref plugin= ha_resolve_by_name(thd, &tmp_name, tmp_table)) + if (plugin_ref plugin= ha_resolve_by_name(thd, &m_storage_engine_name, + tmp_table)) { *ha= plugin_hton(plugin); return false; @@ -334,7 +329,6 @@ handler *get_ha_partition(partition_info *part_info) } #endif - static const char **handler_errmsgs; C_MODE_START @@ -448,7 +442,7 @@ static int full_discover_for_existence(handlerton *, const char *, const char *) static int ext_based_existence(handlerton *, const char *, const char *) { return 0; } -static int hton_ext_based_table_discovery(handlerton *hton, LEX_STRING *db, +static int hton_ext_based_table_discovery(handlerton *hton, LEX_CSTRING *db, MY_DIR *dir, handlerton::discovered_list *result) { /* @@ -661,7 +655,7 @@ int ha_initialize_handlerton(st_plugin_int *plugin) /* This is entirely for legacy. We will create a new "disk based" hton and a "memory" hton which will be configurable longterm. We should be able to - remove partition and myisammrg. + remove partition. */ switch (hton->db_type) { case DB_TYPE_HEAP: @@ -673,6 +667,9 @@ int ha_initialize_handlerton(st_plugin_int *plugin) case DB_TYPE_PARTITION_DB: partition_hton= hton; break; + case DB_TYPE_SEQUENCE: + sql_sequence_hton= hton; + break; default: break; }; @@ -691,7 +688,7 @@ err_deinit: (void) plugin->plugin->deinit(NULL); err: -#ifndef DBUG_OFF +#ifdef DBUG_ASSERT_EXISTS if (hton->prepare && hton->state == SHOW_OPTION_YES) failed_ha_2pc++; #endif @@ -728,7 +725,7 @@ int ha_end() So if flag is equal to HA_PANIC_CLOSE, the deallocate the errors. */ - if (ha_finish_errors()) + if (unlikely(ha_finish_errors())) error= 1; DBUG_RETURN(error); @@ -1227,7 +1224,7 @@ int ha_prepare(THD *thd) handlerton *ht= ha_info->ht(); if (ht->prepare) { - if (prepare_or_error(ht, thd, all)) + if (unlikely(prepare_or_error(ht, thd, all))) { ha_rollback_trans(thd, all); error=1; @@ -1447,6 +1444,41 @@ int ha_commit_trans(THD *thd, bool all) goto err; } +#if 1 // FIXME: This should be done in ha_prepare(). + if (rw_trans || (thd->lex->sql_command == SQLCOM_ALTER_TABLE && + thd->lex->alter_info.flags & ALTER_ADD_SYSTEM_VERSIONING && + is_real_trans)) + { + ulonglong trx_start_id= 0, trx_end_id= 0; + for (Ha_trx_info *ha_info= trans->ha_list; ha_info; ha_info= ha_info->next()) + { + if (ha_info->ht()->prepare_commit_versioned) + { + trx_end_id= ha_info->ht()->prepare_commit_versioned(thd, &trx_start_id); + if (trx_end_id) + break; // FIXME: use a common ID for cross-engine transactions + } + } + + if (trx_end_id) + { + if (!TR_table::use_transaction_registry) + { + my_error(ER_VERS_TRT_IS_DISABLED, MYF(0)); + goto err; + } + DBUG_ASSERT(trx_start_id); + TR_table trt(thd, true); + if (trt.update(trx_start_id, trx_end_id)) + goto err; + // Here, the call will not commit inside InnoDB. It is only working + // around closing thd->transaction.stmt open by TR_table::open(). + if (all) + commit_one_phase_2(thd, false, &thd->transaction.stmt, false); + } + } +#endif + if (trans->no_2pc || (rw_ha_count <= 1)) { error= ha_commit_one_phase(thd, all); @@ -1471,7 +1503,7 @@ int ha_commit_trans(THD *thd, bool all) Sic: we know that prepare() is not NULL since otherwise trans->no_2pc would have been set. */ - if (prepare_or_error(ht, thd, all)) + if (unlikely(prepare_or_error(ht, thd, all))) goto err; need_prepare_ordered|= (ht->prepare_ordered != NULL); @@ -1480,11 +1512,13 @@ int ha_commit_trans(THD *thd, bool all) DEBUG_SYNC(thd, "ha_commit_trans_after_prepare"); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); +#ifdef WITH_WSREP if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid)) { // xid was rewritten by wsrep xid= wsrep_xid_seqno(thd->transaction.xid_state.xid); } +#endif /* WITH_WSREP */ if (!is_real_trans) { @@ -1517,7 +1551,10 @@ done: mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock()); mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - RUN_HOOK(transaction, after_commit, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_commit(thd, all); + DEBUG_SYNC(thd, "after_group_after_commit"); +#endif goto end; /* Come here if error and we need to rollback. */ @@ -1591,6 +1628,7 @@ static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) { int error= 0; + uint count= 0; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; DBUG_ENTER("commit_one_phase_2"); if (is_real_trans) @@ -1608,6 +1646,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) } /* Should this be done only if is_real_trans is set ? */ status_var_increment(thd->status_var.ha_commit_count); + if (is_real_trans && ht != binlog_hton && ha_info->is_trx_read_write()) + ++count; ha_info_next= ha_info->next(); ha_info->reset(); /* keep it conveniently zero-filled */ } @@ -1626,6 +1666,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) { thd->has_waiter= false; thd->transaction.cleanup(); + if (count >= 2) + statistic_increment(transactions_multi_engine, LOCK_status); } DBUG_RETURN(error); @@ -1757,7 +1799,9 @@ int ha_rollback_trans(THD *thd, bool all) push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)); - (void) RUN_HOOK(transaction, after_rollback, (thd, FALSE)); +#ifdef HAVE_REPLICATION + repl_semisync_master.wait_after_rollback(thd, all); +#endif DBUG_RETURN(error); } @@ -1946,9 +1990,10 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, for (int i=0; i < got; i ++) { - my_xid x= WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ? - wsrep_xid_seqno(info->list[i]) : - info->list[i].get_my_xid(); + my_xid x= IF_WSREP(WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ? + wsrep_xid_seqno(info->list[i]) : + info->list[i].get_my_xid(), + info->list[i].get_my_xid()); if (!x) // not "mine" - that is generated by external TM { #ifndef DBUG_OFF @@ -2070,6 +2115,97 @@ int ha_recover(HASH *commit_list) } /** + return the XID as it appears in the SQL function's arguments. + So this string can be passed to XA START, XA PREPARE etc... + + @note + the 'buf' has to have space for at least SQL_XIDSIZE bytes. +*/ + + +/* + 'a'..'z' 'A'..'Z', '0'..'9' + and '-' '_' ' ' symbols don't have to be + converted. +*/ + +static const char xid_needs_conv[128]= +{ + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1, + 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1, + 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0, + 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1 +}; + +uint get_sql_xid(XID *xid, char *buf) +{ + int tot_len= xid->gtrid_length + xid->bqual_length; + int i; + const char *orig_buf= buf; + + for (i=0; i<tot_len; i++) + { + uchar c= ((uchar *) xid->data)[i]; + if (c >= 128 || xid_needs_conv[c]) + break; + } + + if (i >= tot_len) + { + /* No need to convert characters to hexadecimals. */ + *buf++= '\''; + memcpy(buf, xid->data, xid->gtrid_length); + buf+= xid->gtrid_length; + *buf++= '\''; + if (xid->bqual_length > 0 || xid->formatID != 1) + { + *buf++= ','; + *buf++= '\''; + memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length); + buf+= xid->bqual_length; + *buf++= '\''; + } + } + else + { + *buf++= 'X'; + *buf++= '\''; + for (i= 0; i < xid->gtrid_length; i++) + { + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; + } + *buf++= '\''; + if (xid->bqual_length > 0 || xid->formatID != 1) + { + *buf++= ','; + *buf++= 'X'; + *buf++= '\''; + for (; i < tot_len; i++) + { + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; + } + *buf++= '\''; + } + } + + if (xid->formatID != 1) + { + *buf++= ','; + buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf, + MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID); + } + + return (uint)(buf - orig_buf); +} + + +/** return the list of XID's to a client, the same way SHOW commands do. @note @@ -2078,7 +2214,8 @@ int ha_recover(HASH *commit_list) It can be easily fixed later, if necessary. */ -static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol) +static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol, + char *data, uint data_len, CHARSET_INFO *data_cs) { if (xs->xa_state == XA_PREPARED) { @@ -2086,8 +2223,7 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol) protocol->store_longlong((longlong) xs->xid.formatID, FALSE); protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE); protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE); - protocol->store(xs->xid.data, xs->xid.gtrid_length + xs->xid.bqual_length, - &my_charset_bin); + protocol->store(data, data_len, data_cs); if (protocol->write()) return TRUE; } @@ -2095,11 +2231,28 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol) } +static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol) +{ + return xa_recover_callback(xs, protocol, xs->xid.data, + xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin); +} + + +static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol) +{ + char buf[SQL_XIDSIZE]; + uint len= get_sql_xid(&xs->xid, buf); + return xa_recover_callback(xs, protocol, buf, len, + &my_charset_utf8_general_ci); +} + + bool mysql_xa_recover(THD *thd) { List<Item> field_list; Protocol *protocol= thd->protocol; MEM_ROOT *mem_root= thd->mem_root; + my_hash_walk_action action; DBUG_ENTER("mysql_xa_recover"); field_list.push_back(new (mem_root) @@ -2111,16 +2264,32 @@ bool mysql_xa_recover(THD *thd) field_list.push_back(new (mem_root) Item_int(thd, "bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS), mem_root); - field_list.push_back(new (mem_root) - Item_empty_string(thd, "data", - XIDDATASIZE), mem_root); + { + uint len; + CHARSET_INFO *cs; + + if (thd->lex->verbose) + { + len= SQL_XIDSIZE; + cs= &my_charset_utf8_general_ci; + action= (my_hash_walk_action) xa_recover_callback_verbose; + } + else + { + len= XIDDATASIZE; + cs= &my_charset_bin; + action= (my_hash_walk_action) xa_recover_callback_short; + } + + field_list.push_back(new (mem_root) + Item_empty_string(thd, "data", len, cs), mem_root); + } if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); - if (xid_cache_iterate(thd, (my_hash_walk_action) xa_recover_callback, - protocol)) + if (xid_cache_iterate(thd, action, protocol)) DBUG_RETURN(1); my_eof(thd); DBUG_RETURN(0); @@ -2330,7 +2499,7 @@ int ha_start_consistent_snapshot(THD *thd) */ if (warn) push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, - "This MySQL server does not support any " + "This MariaDB server does not support any " "consistent-read capable storage engine"); return 0; } @@ -2420,7 +2589,7 @@ const char *get_canonical_filename(handler *file, const char *path, The .frm file will be deleted only if we return 0. */ int ha_delete_table(THD *thd, handlerton *table_type, const char *path, - const char *db, const char *alias, bool generate_warning) + const LEX_CSTRING *db, const LEX_CSTRING *alias, bool generate_warning) { handler *file; char tmp_path[FN_REFLEN]; @@ -2439,7 +2608,7 @@ int ha_delete_table(THD *thd, handlerton *table_type, const char *path, dummy_table.s= &dummy_share; path= get_canonical_filename(file, path, tmp_path); - if ((error= file->ha_delete_table(path))) + if (unlikely((error= file->ha_delete_table(path)))) { /* it's not an error if the table doesn't exist in the engine. @@ -2453,12 +2622,9 @@ int ha_delete_table(THD *thd, handlerton *table_type, const char *path, dummy_share.path.str= (char*) path; dummy_share.path.length= strlen(path); dummy_share.normalized_path= dummy_share.path; - dummy_share.db.str= (char*) db; - dummy_share.db.length= strlen(db); - dummy_share.table_name.str= (char*) alias; - dummy_share.table_name.length= strlen(alias); - dummy_table.alias.set(alias, dummy_share.table_name.length, - table_alias_charset); + dummy_share.db= *db; + dummy_share.table_name= *alias; + dummy_table.alias.set(alias->str, alias->length, table_alias_charset); file->change_table_ptr(&dummy_table, &dummy_share); file->print_error(error, MYF(intercept ? ME_JUST_WARNING : 0)); } @@ -2473,6 +2639,18 @@ int ha_delete_table(THD *thd, handlerton *table_type, const char *path, /**************************************************************************** ** General handler functions ****************************************************************************/ + + +/** + Clone a handler + + @param name name of new table instance + @param mem_root Where 'this->ref' should be allocated. It can't be + in this->table->mem_root as otherwise we will not be + able to reclaim that memory when the clone handler + object is destroyed. +*/ + handler *handler::clone(const char *name, MEM_ROOT *mem_root) { handler *new_handler= get_new_handler(table->s, mem_root, ht); @@ -2483,16 +2661,6 @@ handler *handler::clone(const char *name, MEM_ROOT *mem_root) goto err; /* - Allocate handler->ref here because otherwise ha_open will allocate it - on this->table->mem_root and we will not be able to reclaim that memory - when the clone handler object is destroyed. - */ - - if (!(new_handler->ref= (uchar*) alloc_root(mem_root, - ALIGN_SIZE(ref_length)*2))) - goto err; - - /* TODO: Implement a more efficient way to have more than one index open for the same table instance. The ha_open call is not cachable for clone. @@ -2500,7 +2668,7 @@ handler *handler::clone(const char *name, MEM_ROOT *mem_root) and should be able to use the original instance of the table. */ if (new_handler->ha_open(table, name, table->db_stat, - HA_OPEN_IGNORE_IF_LOCKED)) + HA_OPEN_IGNORE_IF_LOCKED, mem_root)) goto err; return new_handler; @@ -2510,6 +2678,11 @@ err: return NULL; } +LEX_CSTRING *handler::engine_name() +{ + return hton_name(ht); +} + double handler::keyread_time(uint index, uint ranges, ha_rows rows) { @@ -2522,7 +2695,7 @@ double handler::keyread_time(uint index, uint ranges, ha_rows rows) engines that support that (e.g. InnoDB) may want to overwrite this method. The model counts in the time to read index entries from cache. */ - ulong len= table->key_info[index].key_length + ref_length; + size_t len= table->key_info[index].key_length + ref_length; if (index == table->s->primary_key && table->file->primary_key_is_clustered()) len= table->s->stored_rec_length; double keys_per_block= (stats.block_size/2.0/len+1); @@ -2573,7 +2746,8 @@ PSI_table_share *handler::ha_table_share_psi() const Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set */ int handler::ha_open(TABLE *table_arg, const char *name, int mode, - uint test_if_locked) + uint test_if_locked, MEM_ROOT *mem_root, + List<String> *partitions_to_open) { int error; DBUG_ENTER("handler::ha_open"); @@ -2588,7 +2762,9 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode, DBUG_PRINT("info", ("old m_lock_type: %d F_UNLCK %d", m_lock_type, F_UNLCK)); DBUG_ASSERT(alloc_root_inited(&table->mem_root)); - if ((error=open(name,mode,test_if_locked))) + set_partitions_to_open(partitions_to_open); + + if (unlikely((error=open(name,mode,test_if_locked)))) { if ((error == EACCES || error == EROFS) && mode == O_RDWR && (table->db_stat & HA_TRY_READ_ONLY)) @@ -2597,7 +2773,7 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode, error=open(name,O_RDONLY,test_if_locked); } } - if (error) + if (unlikely(error)) { my_errno= error; /* Safeguard */ DBUG_PRINT("error",("error: %d errno: %d",error,errno)); @@ -2620,9 +2796,9 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode, table->db_stat|=HA_READ_ONLY; (void) extra(HA_EXTRA_NO_READCHECK); // Not needed in SQL - /* ref is already allocated for us if we're called from handler::clone() */ - if (!ref && !(ref= (uchar*) alloc_root(&table->mem_root, - ALIGN_SIZE(ref_length)*2))) + /* Allocate ref in thd or on the table's mem_root */ + if (!(ref= (uchar*) alloc_root(mem_root ? mem_root : &table->mem_root, + ALIGN_SIZE(ref_length)*2))) { ha_close(); error=HA_ERR_OUT_OF_MEM; @@ -2633,6 +2809,7 @@ int handler::ha_open(TABLE *table_arg, const char *name, int mode, } reset_statistics(); internal_tmp_table= MY_TEST(test_if_locked & HA_OPEN_INTERNAL_TABLE); + DBUG_RETURN(error); } @@ -2665,19 +2842,27 @@ int handler::ha_rnd_next(uchar *buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited == RND); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, - { result= rnd_next(buf); }) - if (!result) + do { - update_rows_read(); - if (table->vfield && buf == table->record[0]) - table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ); - increment_statistics(&SSV::ha_read_rnd_next_count); - } - else if (result == HA_ERR_RECORD_DELETED) - increment_statistics(&SSV::ha_read_rnd_deleted_count); + TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, + { result= rnd_next(buf); }) + if (result != HA_ERR_RECORD_DELETED) + break; + status_var_increment(table->in_use->status_var.ha_read_rnd_deleted_count); + } while (!table->in_use->check_killed(1)); + + if (result == HA_ERR_RECORD_DELETED) + result= HA_ERR_ABORTED_BY_USER; else + { + if (!result) + { + update_rows_read(); + if (table->vfield && buf == table->record[0]) + table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ); + } increment_statistics(&SSV::ha_read_rnd_next_count); + } table->status=result ? STATUS_NOT_FOUND: 0; DBUG_RETURN(result); @@ -2694,7 +2879,9 @@ int handler::ha_rnd_pos(uchar *buf, uchar *pos) TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, { result= rnd_pos(buf, pos); }) increment_statistics(&SSV::ha_read_rnd_count); - if (!result) + if (result == HA_ERR_RECORD_DELETED) + result= HA_ERR_KEY_NOT_FOUND; + else if (!result) { update_rows_read(); if (table->vfield && buf == table->record[0]) @@ -2872,7 +3059,7 @@ bool handler::ha_was_semi_consistent_read() int handler::ha_rnd_init_with_error(bool scan) { int error; - if (!(error= ha_rnd_init(scan))) + if (likely(!(error= ha_rnd_init(scan)))) return 0; table->file->print_error(error, MYF(0)); return error; @@ -2880,10 +3067,11 @@ int handler::ha_rnd_init_with_error(bool scan) /** - Read first row (only) from a table. + Read first row (only) from a table. Used for reading tables with + only one row, either based on table statistics or if table is a SEQUENCE. - This is never called for InnoDB tables, as these table types - has the HA_STATS_RECORDS_IS_EXACT set. + This is never called for normal InnoDB tables, as these table types + does not have HA_STATS_RECORDS_IS_EXACT set. */ int handler::read_first_row(uchar * buf, uint primary_key) { @@ -2898,23 +3086,22 @@ int handler::read_first_row(uchar * buf, uint primary_key) if (stats.deleted < 10 || primary_key >= MAX_KEY || !(index_flags(primary_key, 0, 0) & HA_READ_ORDER)) { - if (!(error= ha_rnd_init(1))) + if (likely(!(error= ha_rnd_init(1)))) { - while ((error= ha_rnd_next(buf)) == HA_ERR_RECORD_DELETED) - /* skip deleted row */; + error= ha_rnd_next(buf); const int end_error= ha_rnd_end(); - if (!error) + if (likely(!error)) error= end_error; } } else { /* Find the first row through the primary key */ - if (!(error= ha_index_init(primary_key, 0))) + if (likely(!(error= ha_index_init(primary_key, 0)))) { error= ha_index_first(buf); const int end_error= ha_index_end(); - if (!error) + if (likely(!error)) error= end_error; } } @@ -3131,6 +3318,25 @@ int handler::update_auto_increment() DBUG_RETURN(0); } + // ALTER TABLE ... ADD COLUMN ... AUTO_INCREMENT + if (thd->lex->sql_command == SQLCOM_ALTER_TABLE) + { + if (table->versioned()) + { + Field *end= table->vers_end_field(); + DBUG_ASSERT(end); + bitmap_set_bit(table->read_set, end->field_index); + if (!end->is_max()) + { + if (!table->next_number_field->real_maybe_null()) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + table->next_number_field->set_null(); + DBUG_RETURN(0); + } + } + table->next_number_field->set_notnull(); + } + if ((nr= next_insert_id) >= auto_inc_interval_for_cur_row.maximum()) { /* next_insert_id is beyond what is reserved, so we reserve more. */ @@ -3230,7 +3436,7 @@ int handler::update_auto_increment() /* Store field without warning (Warning will be printed by insert) */ save_count_cuted_fields= thd->count_cuted_fields; thd->count_cuted_fields= CHECK_FIELD_IGNORE; - tmp= table->next_number_field->store((longlong) nr, TRUE); + tmp= table->next_number_field->store((longlong)nr, TRUE); thd->count_cuted_fields= save_count_cuted_fields; if (unlikely(tmp)) // Out of range value in store @@ -3396,7 +3602,7 @@ void handler::get_auto_increment(ulonglong offset, ulonglong increment, *nb_reserved_values= 1; } - if (error) + if (unlikely(error)) { if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND) /* No entry found, that's fine */; @@ -3451,6 +3657,10 @@ void handler::ha_release_auto_increment() @param msg Error message template to which key value should be added. @param errflag Flags for my_error() call. + + @notes + The error message is from ER_DUP_ENTRY_WITH_KEY_NAME but to keep things compatibly + with old code, the error number is ER_DUP_ENTRY */ void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag) @@ -3477,7 +3687,8 @@ void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag) str.length(max_length-4); str.append(STRING_WITH_LEN("...")); } - my_printf_error(ER_DUP_ENTRY, msg, errflag, str.c_ptr_safe(), key->name); + my_printf_error(ER_DUP_ENTRY, msg, errflag, str.c_ptr_safe(), + key->name.str); } } @@ -3649,13 +3860,19 @@ void handler::print_error(int error, myf errflag) textno=ER_UNSUPPORTED_EXTENSION; break; case HA_ERR_RECORD_FILE_FULL: - case HA_ERR_INDEX_FILE_FULL: { textno=ER_RECORD_FILE_FULL; /* Write the error message to error log */ errflag|= ME_NOREFRESH; break; } + case HA_ERR_INDEX_FILE_FULL: + { + textno=ER_INDEX_FILE_FULL; + /* Write the error message to error log */ + errflag|= ME_NOREFRESH; + break; + } case HA_ERR_LOCK_WAIT_TIMEOUT: textno=ER_LOCK_WAIT_TIMEOUT; break; @@ -3710,7 +3927,7 @@ void handler::print_error(int error, myf errflag) const char *ptr= "???"; uint key_nr= get_dup_key(error); if ((int) key_nr >= 0) - ptr= table->key_info[key_nr].name; + ptr= table->key_info[key_nr].name.str; my_error(ER_DROP_INDEX_FK, errflag, ptr); DBUG_VOID_RETURN; } @@ -3730,7 +3947,7 @@ void handler::print_error(int error, myf errflag) break; case HA_ERR_AUTOINC_ERANGE: textno= error; - my_error(textno, errflag, table->next_number_field->field_name, + my_error(textno, errflag, table->next_number_field->field_name.str, table->in_use->get_stmt_da()->current_row_for_warning()); DBUG_VOID_RETURN; break; @@ -3777,7 +3994,7 @@ void handler::print_error(int error, myf errflag) } } DBUG_ASSERT(textno > 0); - if (fatal_error) + if (unlikely(fatal_error)) { /* Ensure this becomes a true error */ errflag&= ~(ME_JUST_WARNING | ME_JUST_INFO); @@ -3904,7 +4121,7 @@ int handler::ha_check_for_upgrade(HA_CHECK_OPT *check_opt) if (table->s->frm_version < FRM_VER_TRUE_VARCHAR) return HA_ADMIN_NEEDS_ALTER; - if ((error= check_collation_compatibility())) + if (unlikely((error= check_collation_compatibility()))) return error; return check_for_upgrade(check_opt); @@ -3959,7 +4176,7 @@ static bool update_frm_version(TABLE *table) int4store(version, MYSQL_VERSION_ID); - if ((result= mysql_file_pwrite(file, (uchar*) version, 4, 51L, MYF_RW))) + if ((result= (int)mysql_file_pwrite(file, (uchar*) version, 4, 51L, MYF_RW))) goto err; table->s->mysql_version= MYSQL_VERSION_ID; @@ -3982,7 +4199,8 @@ uint handler::get_dup_key(int error) m_lock_type != F_UNLCK); DBUG_ENTER("handler::get_dup_key"); table->file->errkey = (uint) -1; - if (error == HA_ERR_FOUND_DUPP_KEY || error == HA_ERR_FOREIGN_DUPLICATE_KEY || + if (error == HA_ERR_FOUND_DUPP_KEY || + error == HA_ERR_FOREIGN_DUPLICATE_KEY || error == HA_ERR_FOUND_DUPP_UNIQUE || error == HA_ERR_NULL_IN_SPATIAL || error == HA_ERR_DROP_INDEX_FK) table->file->info(HA_STATUS_ERRKEY | HA_STATUS_NO_LOCK); @@ -4046,14 +4264,14 @@ int handler::rename_table(const char * from, const char * to) start_ext= bas_ext(); for (ext= start_ext; *ext ; ext++) { - if (rename_file_ext(from, to, *ext)) + if (unlikely(rename_file_ext(from, to, *ext))) { if ((error=my_errno) != ENOENT) break; error= 0; } } - if (error) + if (unlikely(error)) { /* Try to revert the rename. Ignore errors. */ for (; ext >= start_ext; ext--) @@ -4097,15 +4315,15 @@ int handler::ha_check(THD *thd, HA_CHECK_OPT *check_opt) if (table->s->mysql_version < MYSQL_VERSION_ID) { - if ((error= check_old_types())) + if (unlikely((error= check_old_types()))) return error; error= ha_check_for_upgrade(check_opt); - if (error && (error != HA_ADMIN_NEEDS_CHECK)) + if (unlikely(error && (error != HA_ADMIN_NEEDS_CHECK))) return error; - if (!error && (check_opt->sql_flags & TT_FOR_UPGRADE)) + if (unlikely(!error && (check_opt->sql_flags & TT_FOR_UPGRADE))) return 0; } - if ((error= check(thd, check_opt))) + if (unlikely((error= check(thd, check_opt)))) return error; /* Skip updating frm version if not main handler. */ if (table->file != this) @@ -4131,7 +4349,7 @@ void handler::mark_trx_read_write_internal() */ if (ha_info->is_started()) { - DBUG_ASSERT(has_transactions()); + DBUG_ASSERT(has_transaction_manager()); /* table_share can be NULL in ha_delete_table(). See implementation of standalone function ha_delete_table() in sql_base.cc. @@ -4171,8 +4389,8 @@ int handler::ha_repair(THD* thd, HA_CHECK_OPT* check_opt) */ int -handler::ha_bulk_update_row(const uchar *old_data, uchar *new_data, - uint *dup_key_found) +handler::ha_bulk_update_row(const uchar *old_data, const uchar *new_data, + ha_rows *dup_key_found) { DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type == F_WRLCK); @@ -4379,21 +4597,29 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table, HA_CREATE_INFO *create_info= ha_alter_info->create_info; - Alter_inplace_info::HA_ALTER_FLAGS inplace_offline_operations= - Alter_inplace_info::ALTER_COLUMN_EQUAL_PACK_LENGTH | - Alter_inplace_info::ALTER_COLUMN_NAME | - Alter_inplace_info::ALTER_COLUMN_DEFAULT | - Alter_inplace_info::ALTER_COLUMN_OPTION | - Alter_inplace_info::CHANGE_CREATE_OPTION | - Alter_inplace_info::ALTER_PARTITIONED | - Alter_inplace_info::ALTER_VIRTUAL_GCOL_EXPR | - Alter_inplace_info::ALTER_RENAME; + if (altered_table->versioned(VERS_TIMESTAMP)) + DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); + + alter_table_operations inplace_offline_operations= + ALTER_COLUMN_EQUAL_PACK_LENGTH | + ALTER_COLUMN_NAME | + ALTER_RENAME_COLUMN | + ALTER_CHANGE_COLUMN_DEFAULT | + ALTER_COLUMN_DEFAULT | + ALTER_COLUMN_OPTION | + ALTER_CHANGE_CREATE_OPTION | + ALTER_DROP_CHECK_CONSTRAINT | + ALTER_PARTITIONED | + ALTER_VIRTUAL_GCOL_EXPR | + ALTER_RENAME; /* Is there at least one operation that requires copy algorithm? */ if (ha_alter_info->handler_flags & ~inplace_offline_operations) DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); /* + The following checks for changes related to ALTER_OPTIONS + ALTER TABLE tbl_name CONVERT TO CHARACTER SET .. and ALTER TABLE table_name DEFAULT CHARSET = .. most likely change column charsets and so not supported in-place through @@ -4405,12 +4631,13 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table, if (create_info->used_fields & (HA_CREATE_USED_CHARSET | HA_CREATE_USED_DEFAULT_CHARSET | HA_CREATE_USED_PACK_KEYS | + HA_CREATE_USED_CHECKSUM | HA_CREATE_USED_MAX_ROWS) || (table->s->row_type != create_info->row_type)) DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); uint table_changes= (ha_alter_info->handler_flags & - Alter_inplace_info::ALTER_COLUMN_EQUAL_PACK_LENGTH) ? + ALTER_COLUMN_EQUAL_PACK_LENGTH) ? IS_EQUAL_PACK_LENGTH : IS_EQUAL_YES; if (table->file->check_if_incompatible_data(create_info, table_changes) == COMPATIBLE_DATA_YES) @@ -4420,7 +4647,7 @@ handler::check_if_supported_inplace_alter(TABLE *altered_table, } void Alter_inplace_info::report_unsupported_error(const char *not_supported, - const char *try_instead) + const char *try_instead) const { if (unsupported_reason == NULL) my_error(ER_ALTER_OPERATION_NOT_SUPPORTED, MYF(0), @@ -4608,7 +4835,7 @@ int ha_enable_transaction(THD *thd, bool on) is an optimization hint that storage engine is free to ignore. So, let's commit an open transaction (if any) now. */ - if (!(error= ha_commit_trans(thd, 0))) + if (likely(!(error= ha_commit_trans(thd, 0)))) error= trans_commit_implicit(thd); } DBUG_RETURN(error); @@ -4677,14 +4904,12 @@ void handler::get_dynamic_partition_info(PARTITION_STATS *stat_info, stat_info->data_file_length= stats.data_file_length; stat_info->max_data_file_length= stats.max_data_file_length; stat_info->index_file_length= stats.index_file_length; + stat_info->max_index_file_length=stats.max_index_file_length; stat_info->delete_length= stats.delete_length; stat_info->create_time= stats.create_time; stat_info->update_time= stats.update_time; stat_info->check_time= stats.check_time; - stat_info->check_sum= 0; - if (table_flags() & (HA_HAS_OLD_CHECKSUM | HA_HAS_NEW_CHECKSUM)) - stat_info->check_sum= checksum(); - return; + stat_info->check_sum= stats.checksum; } @@ -4708,7 +4933,8 @@ void handler::update_global_table_stats() if (rows_read + rows_changed == 0) return; // Nothing to update. - DBUG_ASSERT(table->s && table->s->table_cache_key.str); + DBUG_ASSERT(table->s); + DBUG_ASSERT(table->s->table_cache_key.str); mysql_mutex_lock(&LOCK_global_table_stats); /* Gets the global table stats, creating one if necessary. */ @@ -4726,7 +4952,7 @@ void handler::update_global_table_stats() } memcpy(table_stats->table, table->s->table_cache_key.str, table->s->table_cache_key.length); - table_stats->table_name_length= table->s->table_cache_key.length; + table_stats->table_name_length= (uint)table->s->table_cache_key.length; table_stats->engine_type= ht->db_type; /* No need to set variables to 0, as we use MY_ZEROFILL above */ @@ -4769,13 +4995,13 @@ void handler::update_global_index_stats() if (index_rows_read[index]) { INDEX_STATS* index_stats; - uint key_length; + size_t key_length; KEY *key_info = &table->key_info[index]; // Rows were read using this DBUG_ASSERT(key_info->cache_name); if (!key_info->cache_name) continue; - key_length= table->s->table_cache_key.length + key_info->name_length + 1; + key_length= table->s->table_cache_key.length + key_info->name.length + 1; mysql_mutex_lock(&LOCK_global_index_stats); // Gets the global index stats, creating one if necessary. if (!(index_stats= (INDEX_STATS*) my_hash_search(&global_index_stats, @@ -4805,6 +5031,98 @@ end: } +static void flush_checksum(ha_checksum *row_crc, uchar **checksum_start, + size_t *checksum_length) +{ + if (*checksum_start) + { + *row_crc= my_checksum(*row_crc, *checksum_start, *checksum_length); + *checksum_start= NULL; + *checksum_length= 0; + } +} + + +/* calculating table's checksum */ +int handler::calculate_checksum() +{ + int error; + THD *thd=ha_thd(); + DBUG_ASSERT(table->s->last_null_bit_pos < 8); + uchar null_mask= table->s->last_null_bit_pos + ? 256 - (1 << table->s->last_null_bit_pos) : 0; + + table->use_all_columns(); + stats.checksum= 0; + + if ((error= ha_rnd_init(1))) + return error; + + for (;;) + { + if (thd->killed) + return HA_ERR_ABORTED_BY_USER; + + ha_checksum row_crc= 0; + error= table->file->ha_rnd_next(table->record[0]); + if (error) + break; + + if (table->s->null_bytes) + { + /* fix undefined null bits */ + table->record[0][table->s->null_bytes-1] |= null_mask; + if (!(table->s->db_create_options & HA_OPTION_PACK_RECORD)) + table->record[0][0] |= 1; + + row_crc= my_checksum(row_crc, table->record[0], table->s->null_bytes); + } + + uchar *checksum_start= NULL; + size_t checksum_length= 0; + for (uint i= 0; i < table->s->fields; i++ ) + { + Field *f= table->field[i]; + + if (! thd->variables.old_mode && f->is_real_null(0)) + { + flush_checksum(&row_crc, &checksum_start, &checksum_length); + continue; + } + /* + BLOB and VARCHAR have pointers in their field, we must convert + to string; GEOMETRY is implemented on top of BLOB. + BIT may store its data among NULL bits, convert as well. + */ + switch (f->type()) { + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_VARCHAR: + case MYSQL_TYPE_GEOMETRY: + case MYSQL_TYPE_BIT: + { + flush_checksum(&row_crc, &checksum_start, &checksum_length); + String tmp; + f->val_str(&tmp); + row_crc= my_checksum(row_crc, (uchar*) tmp.ptr(), tmp.length()); + break; + } + default: + if (!checksum_start) + checksum_start= f->ptr; + DBUG_ASSERT(checksum_start + checksum_length == f->ptr); + checksum_length+= f->pack_length(); + break; + } + } + flush_checksum(&row_crc, &checksum_start, &checksum_length); + + stats.checksum+= row_crc; + } + table->file->ha_rnd_end(); + return error == HA_ERR_END_OF_FILE ? 0 : error; +} + + /**************************************************************************** ** Some general functions that isn't in the handler class ****************************************************************************/ @@ -4828,7 +5146,6 @@ int ha_create_table(THD *thd, const char *path, TABLE_SHARE share; bool temp_table __attribute__((unused)) = create_info->options & (HA_LEX_CREATE_TMP_TABLE | HA_CREATE_TMP_ALTER); - DBUG_ENTER("ha_create_table"); init_tmp_table_share(thd, &share, db, 0, table_name, path); @@ -4856,7 +5173,8 @@ int ha_create_table(THD *thd, const char *path, share.m_psi= PSI_CALL_get_table_share(temp_table, &share); - if (open_table_from_share(thd, &share, "", 0, READ_ALL, 0, &table, true)) + if (open_table_from_share(thd, &share, &empty_clex_str, 0, READ_ALL, 0, + &table, true)) goto err; update_create_info_from_table(create_info, &table); @@ -4865,13 +5183,13 @@ int ha_create_table(THD *thd, const char *path, error= table.file->ha_create(name, &table, create_info); - if (error) + if (unlikely(error)) { if (!thd->is_error()) my_error(ER_CANT_CREATE_TABLE, MYF(0), db, table_name, error); table.file->print_error(error, MYF(ME_JUST_WARNING)); - PSI_CALL_drop_table_share(temp_table, share.db.str, share.db.length, - share.table_name.str, share.table_name.length); + PSI_CALL_drop_table_share(temp_table, share.db.str, (uint)share.db.length, + share.table_name.str, (uint)share.table_name.length); } (void) closefrm(&table); @@ -5020,7 +5338,7 @@ static my_bool discover_handlerton(THD *thd, plugin_ref plugin, int error= hton->discover_table(hton, thd, share); if (error != HA_ERR_NO_SUCH_TABLE) { - if (error) + if (unlikely(error)) { if (!share->error) { @@ -5166,30 +5484,36 @@ private: loaded, frm is invalid), the return value will be true, but *hton will be NULL. */ -bool ha_table_exists(THD *thd, const char *db, const char *table_name, - handlerton **hton) + +bool ha_table_exists(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_name, + handlerton **hton, bool *is_sequence) { handlerton *dummy; + bool dummy2; DBUG_ENTER("ha_table_exists"); if (hton) *hton= 0; else if (engines_with_discover) hton= &dummy; + if (!is_sequence) + is_sequence= &dummy2; + *is_sequence= 0; - TDC_element *element= tdc_lock_share(thd, db, table_name); + TDC_element *element= tdc_lock_share(thd, db->str, table_name->str); if (element && element != MY_ERRPTR) { if (hton) *hton= element->share->db_type(); + *is_sequence= element->share->table_type == TABLE_TYPE_SEQUENCE; tdc_unlock_share(element); DBUG_RETURN(TRUE); } char path[FN_REFLEN + 1]; size_t path_len = build_table_filename(path, sizeof(path) - 1, - db, table_name, "", 0); - st_discover_existence_args args= {path, path_len, db, table_name, 0, true}; + db->str, table_name->str, "", 0); + st_discover_existence_args args= {path, path_len, db->str, table_name->str, 0, true}; if (file_ext_exists(path, path_len, reg_ext)) { @@ -5197,16 +5521,17 @@ bool ha_table_exists(THD *thd, const char *db, const char *table_name, if (hton) { char engine_buf[NAME_CHAR_LEN + 1]; - LEX_STRING engine= { engine_buf, 0 }; - frm_type_enum type; + LEX_CSTRING engine= { engine_buf, 0 }; + Table_type type; - if ((type= dd_frm_type(thd, path, &engine)) == FRMTYPE_ERROR) + if ((type= dd_frm_type(thd, path, &engine, is_sequence)) == + TABLE_TYPE_UNKNOWN) DBUG_RETURN(0); - - if (type != FRMTYPE_VIEW) + + if (type != TABLE_TYPE_VIEW) { - plugin_ref p= plugin_lock_by_name(thd, &engine, - MYSQL_STORAGE_ENGINE_PLUGIN); + plugin_ref p= plugin_lock_by_name(thd, &engine, + MYSQL_STORAGE_ENGINE_PLUGIN); *hton= p ? plugin_hton(p) : NULL; if (*hton) // verify that the table really exists @@ -5227,19 +5552,16 @@ bool ha_table_exists(THD *thd, const char *db, const char *table_name, DBUG_RETURN(TRUE); } - if (need_full_discover_for_existence) { TABLE_LIST table; uint flags = GTS_TABLE | GTS_VIEW; - if (!hton) flags|= GTS_NOLOCK; Table_exists_error_handler no_such_table_handler; thd->push_internal_handler(&no_such_table_handler); - table.init_one_table(db, strlen(db), table_name, strlen(table_name), - table_name, TL_READ); + table.init_one_table(db, table_name, 0, TL_READ); TABLE_SHARE *share= tdc_acquire_share(thd, &table, flags); thd->pop_internal_handler(); @@ -5269,17 +5591,24 @@ static int cmp_file_names(const void *a, const void *b) return my_strnncoll(cs, (uchar*)aa, strlen(aa), (uchar*)bb, strlen(bb)); } -static int cmp_table_names(LEX_STRING * const *a, LEX_STRING * const *b) +static int cmp_table_names(LEX_CSTRING * const *a, LEX_CSTRING * const *b) { return my_strnncoll(&my_charset_bin, (uchar*)((*a)->str), (*a)->length, (uchar*)((*b)->str), (*b)->length); } +#ifndef DBUG_OFF +static int cmp_table_names_desc(LEX_CSTRING * const *a, LEX_CSTRING * const *b) +{ + return -cmp_table_names(a, b); +} +#endif + } Discovered_table_list::Discovered_table_list(THD *thd_arg, - Dynamic_array<LEX_STRING*> *tables_arg, - const LEX_STRING *wild_arg) : + Dynamic_array<LEX_CSTRING*> *tables_arg, + const LEX_CSTRING *wild_arg) : thd(thd_arg), with_temps(false), tables(tables_arg) { if (wild_arg->str && wild_arg->str[0]) @@ -5303,7 +5632,7 @@ bool Discovered_table_list::add_table(const char *tname, size_t tlen) wild_prefix, wild_one, wild_many)) return 0; - LEX_STRING *name= thd->make_lex_string(tname, tlen); + LEX_CSTRING *name= thd->make_clex_string(tname, tlen); if (!name || tables->append(name)) return 1; return 0; @@ -5327,14 +5656,23 @@ void Discovered_table_list::sort() tables->sort(cmp_table_names); } + +#ifndef DBUG_OFF +void Discovered_table_list::sort_desc() +{ + tables->sort(cmp_table_names_desc); +} +#endif + + void Discovered_table_list::remove_duplicates() { - LEX_STRING **src= tables->front(); - LEX_STRING **dst= src; + LEX_CSTRING **src= tables->front(); + LEX_CSTRING **dst= src; sort(); while (++dst <= tables->back()) { - LEX_STRING *s= *src, *d= *dst; + LEX_CSTRING *s= *src, *d= *dst; DBUG_ASSERT(strncmp(s->str, d->str, MY_MIN(s->length, d->length)) <= 0); if ((s->length != d->length || strncmp(s->str, d->str, d->length))) { @@ -5348,7 +5686,7 @@ void Discovered_table_list::remove_duplicates() struct st_discover_names_args { - LEX_STRING *db; + LEX_CSTRING *db; MY_DIR *dirp; Discovered_table_list *result; uint possible_duplicates; @@ -5362,7 +5700,7 @@ static my_bool discover_names(THD *thd, plugin_ref plugin, if (ht->state == SHOW_OPTION_YES && ht->discover_table_names) { - uint old_elements= args->result->tables->elements(); + size_t old_elements= args->result->tables->elements(); if (ht->discover_table_names(ht, args->db, args->dirp, args->result)) return 1; @@ -5371,7 +5709,7 @@ static my_bool discover_names(THD *thd, plugin_ref plugin, a corresponding .frm file; but custom engine discover methods might */ if (ht->discover_table_names != hton_ext_based_table_discovery) - args->possible_duplicates+= args->result->tables->elements() - old_elements; + args->possible_duplicates+= (uint)(args->result->tables->elements() - old_elements); } return 0; @@ -5393,7 +5731,7 @@ static my_bool discover_names(THD *thd, plugin_ref plugin, for DROP DATABASE (as it needs to know and delete non-table files). */ -int ha_discover_table_names(THD *thd, LEX_STRING *db, MY_DIR *dirp, +int ha_discover_table_names(THD *thd, LEX_CSTRING *db, MY_DIR *dirp, Discovered_table_list *result, bool reusable) { int error; @@ -5425,6 +5763,27 @@ int ha_discover_table_names(THD *thd, LEX_STRING *db, MY_DIR *dirp, } +/* +int handler::pre_read_multi_range_first(KEY_MULTI_RANGE **found_range_p, + KEY_MULTI_RANGE *ranges, + uint range_count, + bool sorted, HANDLER_BUFFER *buffer, + bool use_parallel) +{ + int result; + DBUG_ENTER("handler::pre_read_multi_range_first"); + result = pre_read_range_first(ranges->start_key.keypart_map ? + &ranges->start_key : 0, + ranges->end_key.keypart_map ? + &ranges->end_key : 0, + test(ranges->range_flag & EQ_RANGE), + sorted, + use_parallel); + DBUG_RETURN(result); +} +*/ + + /** Read first row between two ranges. Store ranges for future calls to read_range_next. @@ -5616,12 +5975,12 @@ int handler::index_read_idx_map(uchar * buf, uint index, const uchar * key, int error, UNINIT_VAR(error1); error= ha_index_init(index, 0); - if (!error) + if (likely(!error)) { error= index_read_map(buf, key, keypart_map, find_flag); error1= ha_index_end(); } - return error ? error : error1; + return error ? error : error1; } @@ -5743,7 +6102,7 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) { if (db_type->state != SHOW_OPTION_YES) { - const LEX_STRING *name= hton_name(db_type); + const LEX_CSTRING *name= hton_name(db_type); result= stat_print(thd, name->str, name->length, "", 0, "DISABLED", 8) ? 1 : 0; } @@ -5758,7 +6117,7 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) We also check thd->is_error() as Innodb may return 0 even if there was an error. */ - if (!result && !thd->is_error()) + if (likely(!result && !thd->is_error())) my_eof(thd); else if (!thd->is_error()) my_error(ER_GET_ERRNO, MYF(0), errno, hton_name(db_type)->str); @@ -5783,8 +6142,10 @@ bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat) 1 Row needs to be logged */ -inline bool handler::check_table_binlog_row_based(bool binlog_row) +bool handler::check_table_binlog_row_based(bool binlog_row) { + if (table->versioned(VERS_TRX_ID)) + return false; if (unlikely((table->in_use->variables.sql_log_bin_off))) return 0; /* Called by partitioning engine */ if (unlikely((!check_table_binlog_row_based_done))) @@ -5800,7 +6161,7 @@ bool handler::check_table_binlog_row_based_internal(bool binlog_row) { THD *thd= table->in_use; - return (table->s->cached_row_logging_check && + return (table->s->can_do_row_logging && thd->is_current_stmt_binlog_format_row() && /* Wsrep partially enables binary logging if it have not been @@ -5908,8 +6269,6 @@ static int write_locked_table_maps(THD *thd) } -typedef bool Log_func(THD*, TABLE*, bool, const uchar*, const uchar*); - static int binlog_log_row_internal(TABLE* table, const uchar *before_record, const uchar *after_record, @@ -5941,10 +6300,8 @@ static int binlog_log_row_internal(TABLE* table, return error ? HA_ERR_RBR_LOGGING_FAILED : 0; } -static inline int binlog_log_row(TABLE* table, - const uchar *before_record, - const uchar *after_record, - Log_func *log_func) +int binlog_log_row(TABLE* table, const uchar *before_record, + const uchar *after_record, Log_func *log_func) { #ifdef WITH_WSREP THD *const thd= table->in_use; @@ -6023,7 +6380,7 @@ int handler::ha_external_lock(THD *thd, int lock_type) DBUG_EXECUTE_IF("external_lock_failure", error= HA_ERR_GENERIC;); - if (error == 0 || lock_type == F_UNLCK) + if (likely(error == 0 || lock_type == F_UNLCK)) { m_lock_type= lock_type; cached_table_flags= table_flags(); @@ -6075,6 +6432,7 @@ int handler::ha_reset() /* Reset information about pushed engine conditions */ cancel_pushed_idx_cond(); /* Reset information about pushed index conditions */ + clear_top_table_fields(); DBUG_RETURN(reset()); } @@ -6096,7 +6454,7 @@ int handler::ha_write_row(uchar *buf) { error= write_row(buf); }) MYSQL_INSERT_ROW_DONE(error); - if (likely(!error)) + if (likely(!error) && !row_already_logged) { rows_changed++; error= binlog_log_row(table, 0, buf, log_func); @@ -6106,7 +6464,7 @@ int handler::ha_write_row(uchar *buf) } -int handler::ha_update_row(const uchar *old_data, uchar *new_data) +int handler::ha_update_row(const uchar *old_data, const uchar *new_data) { int error; Log_func *log_func= Update_rows_log_event::binlog_row_logging_function; @@ -6128,7 +6486,7 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data) { error= update_row(old_data, new_data);}) MYSQL_UPDATE_ROW_DONE(error); - if (likely(!error)) + if (likely(!error) && !row_already_logged) { rows_changed++; error= binlog_log_row(table, old_data, new_data, log_func); @@ -6136,6 +6494,34 @@ int handler::ha_update_row(const uchar *old_data, uchar *new_data) return error; } +/* + Update first row. Only used by sequence tables +*/ + +int handler::update_first_row(uchar *new_data) +{ + int error; + if (likely(!(error= ha_rnd_init(1)))) + { + int end_error; + if (likely(!(error= ha_rnd_next(table->record[1])))) + { + /* + We have to do the memcmp as otherwise we may get error 169 from InnoDB + */ + if (memcmp(new_data, table->record[1], table->s->reclength)) + error= update_row(table->record[1], new_data); + } + end_error= ha_rnd_end(); + if (likely(!error)) + error= end_error; + /* Logging would be wrong if update_row works but ha_rnd_end fails */ + DBUG_ASSERT(!end_error || error != 0); + } + return error; +} + + int handler::ha_delete_row(const uchar *buf) { int error; @@ -6164,6 +6550,59 @@ int handler::ha_delete_row(const uchar *buf) } +/** + Execute a direct update request. A direct update request updates all + qualified rows in a single operation, rather than one row at a time. + In a Spider cluster the direct update operation is pushed down to the + child levels of the cluster. + + Note that this can't be used in case of statment logging + + @param update_rows Number of updated rows. + + @retval 0 Success. + @retval != 0 Failure. +*/ + +int handler::ha_direct_update_rows(ha_rows *update_rows) +{ + int error; + + MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); + mark_trx_read_write(); + + error = direct_update_rows(update_rows); + MYSQL_UPDATE_ROW_DONE(error); + return error; +} + + +/** + Execute a direct delete request. A direct delete request deletes all + qualified rows in a single operation, rather than one row at a time. + In a Spider cluster the direct delete operation is pushed down to the + child levels of the cluster. + + @param delete_rows Number of deleted rows. + + @retval 0 Success. + @retval != 0 Failure. +*/ + +int handler::ha_direct_delete_rows(ha_rows *delete_rows) +{ + int error; + /* Ensure we are not using binlog row */ + DBUG_ASSERT(!table->in_use->is_current_stmt_binlog_format_row()); + + MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str); + mark_trx_read_write(); + + error = direct_delete_rows(delete_rows); + MYSQL_DELETE_ROW_DONE(error); + return error; +} + /** @brief use_hidden_primary_key() is called in case of an update/delete when @@ -6191,7 +6630,8 @@ void handler::use_hidden_primary_key() Handler_share *handler::get_ha_share_ptr() { DBUG_ENTER("handler::get_ha_share_ptr"); - DBUG_ASSERT(ha_share && table_share); + DBUG_ASSERT(ha_share); + DBUG_ASSERT(table_share); #ifndef DBUG_OFF if (table_share->tmp_table == NO_TMP_TABLE) @@ -6553,7 +6993,7 @@ bool HA_CREATE_INFO::check_conflicting_charset_declarations(CHARSET_INFO *cs) /* Remove all indexes for a given table from global index statistics */ static -int del_global_index_stats_for_table(THD *thd, uchar* cache_key, uint cache_key_length) +int del_global_index_stats_for_table(THD *thd, uchar* cache_key, size_t cache_key_length) { int res = 0; DBUG_ENTER("del_global_index_stats_for_table"); @@ -6589,12 +7029,12 @@ int del_global_index_stats_for_table(THD *thd, uchar* cache_key, uint cache_key_ /* Remove a table from global table statistics */ -int del_global_table_stat(THD *thd, LEX_STRING *db, LEX_STRING *table) +int del_global_table_stat(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table) { TABLE_STATS *table_stats; int res = 0; uchar *cache_key; - uint cache_key_length; + size_t cache_key_length; DBUG_ENTER("del_global_table_stat"); cache_key_length= db->length + 1 + table->length + 1; @@ -6631,7 +7071,7 @@ end: int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info) { INDEX_STATS *index_stats; - uint key_length= table->s->table_cache_key.length + key_info->name_length + 1; + size_t key_length= table->s->table_cache_key.length + key_info->name.length + 1; int res = 0; DBUG_ENTER("del_global_index_stat"); mysql_mutex_lock(&LOCK_global_index_stats); @@ -6644,3 +7084,469 @@ int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info) mysql_mutex_unlock(&LOCK_global_index_stats); DBUG_RETURN(res); } + +bool Vers_parse_info::is_start(const char *name) const +{ + DBUG_ASSERT(name); + return as_row.start && as_row.start.streq(name); +} +bool Vers_parse_info::is_end(const char *name) const +{ + DBUG_ASSERT(name); + return as_row.end && as_row.end.streq(name); +} +bool Vers_parse_info::is_start(const Create_field &f) const +{ + return f.flags & VERS_SYS_START_FLAG; +} +bool Vers_parse_info::is_end(const Create_field &f) const +{ + return f.flags & VERS_SYS_END_FLAG; +} + +static Create_field *vers_init_sys_field(THD *thd, const char *field_name, int flags, bool integer) +{ + Create_field *f= new (thd->mem_root) Create_field(); + if (!f) + return NULL; + + f->field_name.str= field_name; + f->field_name.length= strlen(field_name); + f->charset= system_charset_info; + f->flags= flags | NOT_NULL_FLAG; + if (integer) + { + DBUG_ASSERT(0); // Not implemented yet + f->set_handler(&type_handler_vers_trx_id); + f->length= MY_INT64_NUM_DECIMAL_DIGITS - 1; + f->flags|= UNSIGNED_FLAG; + } + else + { + f->set_handler(&type_handler_timestamp2); + f->length= MAX_DATETIME_PRECISION; + } + f->invisible= DBUG_EVALUATE_IF("sysvers_show", VISIBLE, INVISIBLE_SYSTEM); + + if (f->check(thd)) + return NULL; + + return f; +} + +static bool vers_create_sys_field(THD *thd, const char *field_name, + Alter_info *alter_info, int flags) +{ + Create_field *f= vers_init_sys_field(thd, field_name, flags, false); + if (!f) + return true; + + alter_info->flags|= ALTER_PARSER_ADD_COLUMN; + alter_info->create_list.push_back(f); + + return false; +} + +const Lex_ident Vers_parse_info::default_start= "row_start"; +const Lex_ident Vers_parse_info::default_end= "row_end"; + +bool Vers_parse_info::fix_implicit(THD *thd, Alter_info *alter_info) +{ + // If user specified some of these he must specify the others too. Do nothing. + if (*this) + return false; + + alter_info->flags|= ALTER_PARSER_ADD_COLUMN; + + system_time= start_end_t(default_start, default_end); + as_row= system_time; + + if (vers_create_sys_field(thd, default_start, alter_info, VERS_SYS_START_FLAG) || + vers_create_sys_field(thd, default_end, alter_info, VERS_SYS_END_FLAG)) + { + return true; + } + return false; +} + + +bool Table_scope_and_contents_source_st::vers_fix_system_fields( + THD *thd, Alter_info *alter_info, const TABLE_LIST &create_table) +{ + DBUG_ASSERT(!(alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING)); + + DBUG_EXECUTE_IF("sysvers_force", if (!tmp_table()) { + alter_info->flags|= ALTER_ADD_SYSTEM_VERSIONING; + options|= HA_VERSIONED_TABLE; }); + + if (!vers_info.need_check(alter_info)) + return false; + + if (!vers_info.versioned_fields && vers_info.unversioned_fields && + !(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING)) + { + // All is correct but this table is not versioned. + options&= ~HA_VERSIONED_TABLE; + return false; + } + + if (!(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING) && vers_info) + { + my_error(ER_MISSING, MYF(0), create_table.table_name.str, + "WITH SYSTEM VERSIONING"); + return true; + } + + List_iterator<Create_field> it(alter_info->create_list); + while (Create_field *f= it++) + { + if ((f->versioning == Column_definition::VERSIONING_NOT_SET && + !(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING)) || + f->versioning == Column_definition::WITHOUT_VERSIONING) + { + f->flags|= VERS_UPDATE_UNVERSIONED_FLAG; + } + } // while (Create_field *f= it++) + + if (vers_info.fix_implicit(thd, alter_info)) + return true; + + return false; +} + + +bool Table_scope_and_contents_source_st::vers_check_system_fields( + THD *thd, Alter_info *alter_info, const Lex_table_name &table_name, + const Lex_table_name &db, int select_count) +{ + if (!(options & HA_VERSIONED_TABLE)) + return false; + + if (!(alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING)) + { + uint versioned_fields= 0; + uint fieldnr= 0; + List_iterator<Create_field> field_it(alter_info->create_list); + while (Create_field *f= field_it++) + { + /* + The field from the CREATE part can be duplicated in the SELECT part of + CREATE...SELECT. In that case double counts should be avoided. + select_create::create_table_from_items just pushes the fields back into + the create_list, without additional manipulations, so the fields from + SELECT go last there. + */ + bool is_dup= false; + if (fieldnr >= alter_info->create_list.elements - select_count) + { + List_iterator<Create_field> dup_it(alter_info->create_list); + for (Create_field *dup= dup_it++; !is_dup && dup != f; dup= dup_it++) + is_dup= my_strcasecmp(default_charset_info, + dup->field_name.str, f->field_name.str) == 0; + } + + if (!(f->flags & VERS_UPDATE_UNVERSIONED_FLAG) && !is_dup) + versioned_fields++; + fieldnr++; + } + if (versioned_fields == VERSIONING_FIELDS) + { + my_error(ER_VERS_TABLE_MUST_HAVE_COLUMNS, MYF(0), table_name.str); + return true; + } + } + + if (!(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING)) + return false; + + bool can_native= ha_check_storage_engine_flag(db_type, + HTON_NATIVE_SYS_VERSIONING) + || db_type->db_type == DB_TYPE_PARTITION_DB; + + return vers_info.check_sys_fields(table_name, db, alter_info, can_native); +} + + +bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info, + HA_CREATE_INFO *create_info, TABLE *table) +{ + TABLE_SHARE *share= table->s; + const char *table_name= share->table_name.str; + + if (!need_check(alter_info) && !share->versioned) + return false; + + if (DBUG_EVALUATE_IF("sysvers_force", 0, share->tmp_table)) + { + my_error(ER_VERS_TEMPORARY, MYF(0)); + return true; + } + + if (alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING && + table->versioned()) + { + my_error(ER_VERS_ALREADY_VERSIONED, MYF(0), table_name); + return true; + } + + if (alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING) + { + if (!share->versioned) + { + my_error(ER_VERS_NOT_VERSIONED, MYF(0), table_name); + return true; + } +#ifdef WITH_PARTITION_STORAGE_ENGINE + if (table->part_info && + table->part_info->part_type == VERSIONING_PARTITION) + { + my_error(ER_DROP_VERSIONING_SYSTEM_TIME_PARTITION, MYF(0), table_name); + return true; + } +#endif + + return false; + } + + if (!(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING)) + { + List_iterator_fast<Create_field> it(alter_info->create_list); + while (Create_field *f= it++) + { + if (f->flags & VERS_SYSTEM_FIELD) + { + my_error(ER_VERS_DUPLICATE_ROW_START_END, MYF(0), + f->flags & VERS_SYS_START_FLAG ? "START" : "END", f->field_name.str); + return true; + } + } + } + + if ((alter_info->flags & ALTER_DROP_PERIOD || + versioned_fields || unversioned_fields) && !share->versioned) + { + my_error(ER_VERS_NOT_VERSIONED, MYF(0), table_name); + return true; + } + + if (share->versioned) + { + if (alter_info->flags & ALTER_ADD_PERIOD) + { + my_error(ER_VERS_ALREADY_VERSIONED, MYF(0), table_name); + return true; + } + + // copy info from existing table + create_info->options|= HA_VERSIONED_TABLE; + + DBUG_ASSERT(share->vers_start_field()); + DBUG_ASSERT(share->vers_end_field()); + Lex_ident start(share->vers_start_field()->field_name); + Lex_ident end(share->vers_end_field()->field_name); + DBUG_ASSERT(start.str); + DBUG_ASSERT(end.str); + + as_row= start_end_t(start, end); + system_time= as_row; + + if (alter_info->create_list.elements) + { + List_iterator_fast<Create_field> it(alter_info->create_list); + while (Create_field *f= it++) + { + if (f->versioning == Column_definition::WITHOUT_VERSIONING) + f->flags|= VERS_UPDATE_UNVERSIONED_FLAG; + + if (f->change.str && (start.streq(f->change) || end.streq(f->change))) + { + my_error(ER_VERS_ALTER_SYSTEM_FIELD, MYF(0), f->change.str); + return true; + } + } + } + + return false; + } + + return fix_implicit(thd, alter_info); +} + +bool +Vers_parse_info::fix_create_like(Alter_info &alter_info, HA_CREATE_INFO &create_info, + TABLE_LIST &src_table, TABLE_LIST &table) +{ + List_iterator<Create_field> it(alter_info.create_list); + Create_field *f, *f_start=NULL, *f_end= NULL; + + DBUG_ASSERT(alter_info.create_list.elements > 2); + + if (create_info.tmp_table()) + { + int remove= 2; + while (remove && (f= it++)) + { + if (f->flags & VERS_SYSTEM_FIELD) + { + it.remove(); + remove--; + } + } + DBUG_ASSERT(remove == 0); + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, + ER_UNKNOWN_ERROR, + "System versioning is stripped from temporary `%s.%s`", + table.db.str, table.table_name.str); + return false; + } + + while ((f= it++)) + { + if (f->flags & VERS_SYS_START_FLAG) + { + f_start= f; + if (f_end) + break; + } + else if (f->flags & VERS_SYS_END_FLAG) + { + f_end= f; + if (f_start) + break; + } + } + + if (!f_start || !f_end) + { + my_error(ER_MISSING, MYF(0), src_table.table_name.str, + f_start ? "AS ROW END" : "AS ROW START"); + return true; + } + + as_row= start_end_t(f_start->field_name, f_end->field_name); + system_time= as_row; + + create_info.options|= HA_VERSIONED_TABLE; + return false; +} + +bool Vers_parse_info::need_check(const Alter_info *alter_info) const +{ + return versioned_fields || unversioned_fields || + alter_info->flags & ALTER_ADD_PERIOD || + alter_info->flags & ALTER_DROP_PERIOD || + alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING || + alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING || *this; +} + +bool Vers_parse_info::check_conditions(const Lex_table_name &table_name, + const Lex_table_name &db) const +{ + if (!as_row.start || !as_row.end) + { + my_error(ER_MISSING, MYF(0), table_name.str, + as_row.start ? "AS ROW END" : "AS ROW START"); + return true; + } + + if (!system_time.start || !system_time.end) + { + my_error(ER_MISSING, MYF(0), table_name.str, "PERIOD FOR SYSTEM_TIME"); + return true; + } + + if (!as_row.start.streq(system_time.start) || + !as_row.end.streq(system_time.end)) + { + my_error(ER_VERS_PERIOD_COLUMNS, MYF(0), as_row.start.str, as_row.end.str); + return true; + } + + if (db.streq(MYSQL_SCHEMA_NAME)) + { + my_error(ER_VERS_DB_NOT_SUPPORTED, MYF(0), MYSQL_SCHEMA_NAME.str); + return true; + } + return false; +} + +static bool is_versioning_timestamp(const Create_field *f) +{ + return f->type_handler() == &type_handler_timestamp2 && + f->length == MAX_DATETIME_FULL_WIDTH; +} + +static bool is_some_bigint(const Create_field *f) +{ + return f->type_handler() == &type_handler_longlong || + f->type_handler() == &type_handler_vers_trx_id; +} + +static bool is_versioning_bigint(const Create_field *f) +{ + return is_some_bigint(f) && f->flags & UNSIGNED_FLAG && + f->length == MY_INT64_NUM_DECIMAL_DIGITS - 1; +} + +static bool require_timestamp(const Create_field *f, Lex_table_name table_name) +{ + my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), f->field_name.str, "TIMESTAMP(6)", + table_name.str); + return true; +} +static bool require_bigint(const Create_field *f, Lex_table_name table_name) +{ + my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), f->field_name.str, + "BIGINT(20) UNSIGNED", table_name.str); + return true; +} + +bool Vers_parse_info::check_sys_fields(const Lex_table_name &table_name, + const Lex_table_name &db, + Alter_info *alter_info, + bool can_native) const +{ + if (check_conditions(table_name, db)) + return true; + + const Create_field *row_start= NULL; + const Create_field *row_end= NULL; + + List_iterator<Create_field> it(alter_info->create_list); + while (Create_field *f= it++) + { + if (!row_start && f->flags & VERS_SYS_START_FLAG) + row_start= f; + else if (!row_end && f->flags & VERS_SYS_END_FLAG) + row_end= f; + } + + const bool expect_timestamp= + !can_native || !is_some_bigint(row_start) || !is_some_bigint(row_end); + + if (expect_timestamp) + { + if (!is_versioning_timestamp(row_start)) + return require_timestamp(row_start, table_name); + + if (!is_versioning_timestamp(row_end)) + return require_timestamp(row_end, table_name); + } + else + { + if (!is_versioning_bigint(row_start)) + return require_bigint(row_start, table_name); + + if (!is_versioning_bigint(row_end)) + return require_bigint(row_end, table_name); + } + + if (is_versioning_bigint(row_start) && is_versioning_bigint(row_end) && + !TR_table::use_transaction_registry) + { + my_error(ER_VERS_TRT_IS_DISABLED, MYF(0)); + return true; + } + + return false; +} |