diff options
author | unknown <sergefp@mysql.com> | 2004-04-28 19:27:56 +0400 |
---|---|---|
committer | unknown <sergefp@mysql.com> | 2004-04-28 19:27:56 +0400 |
commit | 2902f9e8a57ca76cc52863d70d02dcf7693ce8cd (patch) | |
tree | 4864372894fc29ff2e4d9de93f2c438b29184acb /sql | |
parent | 15e207b0e7d1cd010b9e0eea4663c98e1eb209de (diff) | |
parent | 5605374b668068ef5ede826b3d04e2fe7ead4f9f (diff) | |
download | mariadb-git-2902f9e8a57ca76cc52863d70d02dcf7693ce8cd.tar.gz |
Merge spetrunia@bk-internal.mysql.com:/home/bk/mysql-4.1
into mysql.com:/dbdata/psergey/mysql-4.1-ps-merge
sql/lex.h:
Auto merged
sql/mysql_priv.h:
Auto merged
sql/mysqld.cc:
Auto merged
sql/sql_class.h:
Auto merged
sql/sql_parse.cc:
Auto merged
sql/sql_yacc.yy:
Auto merged
Diffstat (limited to 'sql')
-rw-r--r-- | sql/Makefile.am | 10 | ||||
-rw-r--r-- | sql/discover.cc | 172 | ||||
-rw-r--r-- | sql/gen_lex_hash.cc | 4 | ||||
-rw-r--r-- | sql/ha_innodb.cc | 294 | ||||
-rw-r--r-- | sql/ha_innodb.h | 2 | ||||
-rw-r--r-- | sql/ha_ndbcluster.cc | 2943 | ||||
-rw-r--r-- | sql/ha_ndbcluster.h | 218 | ||||
-rw-r--r-- | sql/handler.cc | 92 | ||||
-rw-r--r-- | sql/handler.h | 25 | ||||
-rw-r--r-- | sql/item_subselect.cc | 2 | ||||
-rw-r--r-- | sql/item_sum.h | 1 | ||||
-rw-r--r-- | sql/lex.h | 8 | ||||
-rw-r--r-- | sql/log_event.cc | 37 | ||||
-rw-r--r-- | sql/mysql_priv.h | 7 | ||||
-rw-r--r-- | sql/mysqld.cc | 28 | ||||
-rw-r--r-- | sql/set_var.cc | 9 | ||||
-rw-r--r-- | sql/slave.cc | 65 | ||||
-rw-r--r-- | sql/slave.h | 29 | ||||
-rw-r--r-- | sql/sql_base.cc | 21 | ||||
-rw-r--r-- | sql/sql_class.h | 8 | ||||
-rw-r--r-- | sql/sql_db.cc | 7 | ||||
-rw-r--r-- | sql/sql_lex.cc | 10 | ||||
-rw-r--r-- | sql/sql_parse.cc | 2 | ||||
-rw-r--r-- | sql/sql_show.cc | 4 | ||||
-rw-r--r-- | sql/sql_string.cc | 2 | ||||
-rw-r--r-- | sql/sql_table.cc | 29 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 40 | ||||
-rw-r--r-- | sql/table.cc | 3 | ||||
-rw-r--r-- | sql/unireg.h | 2 |
29 files changed, 3847 insertions, 227 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am index bacf3bc58d1..1044673f8ec 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -16,12 +16,11 @@ #called from the top level Makefile - MYSQLDATAdir = $(localstatedir) MYSQLSHAREdir = $(pkgdatadir) MYSQLBASEdir= $(prefix) INCLUDES = @MT_INCLUDES@ \ - @bdb_includes@ @innodb_includes@ \ + @bdb_includes@ @innodb_includes@ @ndbcluster_includes@ \ -I$(top_srcdir)/include -I$(top_srcdir)/regex \ -I$(srcdir) $(openssl_includes) WRAPLIBS= @WRAPLIBS@ @@ -42,6 +41,7 @@ LDADD = @isam_libs@ \ mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \ @bdb_libs@ @innodb_libs@ @pstack_libs@ \ @innodb_system_libs@ \ + @ndbcluster_libs@ @ndbcluster_system_libs@ \ $(LDADD) $(CXXLDFLAGS) $(WRAPLIBS) @LIBDL@ @openssl_libs@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ item_strfunc.h item_timefunc.h item_uniq.h \ @@ -52,7 +52,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ field.h handler.h \ ha_isammrg.h ha_isam.h ha_myisammrg.h\ ha_heap.h ha_myisam.h ha_berkeley.h ha_innodb.h \ - opt_range.h protocol.h \ + ha_ndbcluster.h opt_range.h protocol.h \ sql_select.h structs.h table.h sql_udf.h hash_filo.h\ lex.h lex_symbol.h sql_acl.h sql_crypt.h \ log_event.h sql_repl.h slave.h \ @@ -76,11 +76,11 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc \ procedure.cc item_uniq.cc sql_test.cc \ log.cc log_event.cc init.cc derror.cc sql_acl.cc \ unireg.cc des_key_file.cc \ - time.cc opt_range.cc opt_sum.cc \ + discover.cc time.cc opt_range.cc opt_sum.cc \ records.cc filesort.cc handler.cc \ ha_heap.cc ha_myisam.cc ha_myisammrg.cc \ ha_berkeley.cc ha_innodb.cc \ - ha_isam.cc ha_isammrg.cc \ + ha_isam.cc ha_isammrg.cc ha_ndbcluster.cc \ sql_db.cc sql_table.cc sql_rename.cc sql_crypt.cc \ sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \ sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \ diff --git a/sql/discover.cc b/sql/discover.cc new file mode 100644 index 00000000000..e260f44a8db --- /dev/null +++ b/sql/discover.cc @@ -0,0 +1,172 @@ +/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +/* Functions for discover of frm file from handler */ + +#include "mysql_priv.h" +#include <my_dir.h> + +/* + Read the contents of a .frm file + + SYNOPSIS + readfrm() + + name path to table-file "db/name" + frmdata frm data + len length of the read frmdata + + RETURN VALUES + 0 ok + 1 Could not open file + 2 Could not stat file + 3 Could not allocate data for read + Could not read file + + frmdata and len are set to 0 on error +*/ + +int readfrm(const char *name, + const void **frmdata, uint *len) +{ + int error; + char index_file[FN_REFLEN]; + File file; + ulong read_len; + char *read_data; + MY_STAT state; + DBUG_ENTER("readfrm"); + DBUG_PRINT("enter",("name: '%s'",name)); + + *frmdata= NULL; // In case of errors + *len= 0; + error= 1; + if ((file=my_open(fn_format(index_file,name,"",reg_ext,4), + O_RDONLY | O_SHARE, + MYF(0))) < 0) + goto err_end; + + // Get length of file + error= 2; + if (my_fstat(file, &state, MYF(0))) + goto err; + read_len= state.st_size; + + // Read whole frm file + error= 3; + read_data= 0; + if (read_string(file, &read_data, read_len)) + goto err; + + // Setup return data + *frmdata= (void*) read_data; + *len= read_len; + error= 0; + + err: + if (file > 0) + VOID(my_close(file,MYF(MY_WME))); + + err_end: /* Here when no file */ + DBUG_RETURN (error); +} /* readfrm */ + + +/* + Write the content of a frm data pointer + to a frm file + + SYNOPSIS + writefrm() + + name path to table-file "db/name" + frmdata frm data + len length of the frmdata + + RETURN VALUES + 0 ok + 2 Could not write file +*/ + +int writefrm(const char *name, const void *frmdata, uint len) +{ + File file; + char index_file[FN_REFLEN]; + int error; + DBUG_ENTER("writefrm"); + DBUG_PRINT("enter",("name: '%s' len: %d ",name,len)); + //DBUG_DUMP("frmdata", (char*)frmdata, len); + + error= 0; + if ((file=my_create(fn_format(index_file,name,"",reg_ext,4), + CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0) + { + if (my_write(file,(byte*)frmdata,len,MYF(MY_WME | MY_NABP))) + error= 2; + } + VOID(my_close(file,MYF(0))); + DBUG_RETURN(error); +} /* writefrm */ + + + + +/* + Try to discover table from handler and + if found, write the frm file to disk. + + RETURN VALUES: + 0 : Table existed in handler and created + on disk if so requested + 1 : Table does not exist + >1 : error + +*/ + +int create_table_from_handler(const char *db, + const char *name, + bool create_if_found) +{ + int error= 0; + const void* frmblob = NULL; + char path[FN_REFLEN]; + uint frmlen = 0; + DBUG_ENTER("create_table_from_handler"); + DBUG_PRINT("enter", ("create_if_found: %d", create_if_found)); + + if (ha_discover(db, name, &frmblob, &frmlen)) + DBUG_RETURN(1); // Table does not exist + + // Table exists in handler + if (create_if_found) + { + (void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS); + // Save the frm file + error = writefrm(path, frmblob, frmlen); + } + + err: + if (frmblob) + my_free((char*) frmblob,MYF(0)); + DBUG_RETURN(error); +} + +int table_exists_in_handler(const char *db, + const char *name) +{ + return (create_table_from_handler(db, name, false) == 0); +} diff --git a/sql/gen_lex_hash.cc b/sql/gen_lex_hash.cc index 5dc7c50e04c..7a445ed8c4d 100644 --- a/sql/gen_lex_hash.cc +++ b/sql/gen_lex_hash.cc @@ -461,7 +461,7 @@ int main(int argc,char **argv) hash_map= sql_functions_map;\n\ register uint32 cur_struct= uint4korr(hash_map+((len-1)*4));\n\ \n\ - for(;;){\n\ + for (;;){\n\ register uchar first_char= (uchar)cur_struct;\n\ \n\ if (first_char == 0)\n\ @@ -492,7 +492,7 @@ int main(int argc,char **argv) hash_map= symbols_map;\n\ register uint32 cur_struct= uint4korr(hash_map+((len-1)*4));\n\ \n\ - for(;;){\n\ + for (;;){\n\ register uchar first_char= (uchar)cur_struct;\n\ \n\ if (first_char==0){\n\ diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc index a5dc6719182..4192df22e5c 100644 --- a/sql/ha_innodb.cc +++ b/sql/ha_innodb.cc @@ -15,7 +15,9 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* This file defines the InnoDB handler: the interface between MySQL and -InnoDB */ +InnoDB +NOTE: You can only use noninlined InnoDB functions in this file, because we +have disables the InnoDB inlining in this file. */ /* TODO list for the InnoDB handler in 4.1: - Check if the query_id is now right also in prepared and executed stats @@ -74,6 +76,7 @@ extern "C" { #include "../innobase/include/btr0cur.h" #include "../innobase/include/btr0btr.h" #include "../innobase/include/fsp0fsp.h" +#include "../innobase/include/sync0sync.h" #include "../innobase/include/fil0fil.h" } @@ -321,70 +324,49 @@ convert_error_code_to_mysql( } } -extern "C" { /***************************************************************** Prints info of a THD object (== user session thread) to the standard output. NOTE that /mysql/innobase/trx/trx0trx.c must contain the prototype for this function! */ - +extern "C" void innobase_mysql_print_thd( /*=====================*/ - char* buf, /* in/out: buffer where to print, must be at least - 400 bytes */ + FILE* f, /* in: output stream */ void* input_thd)/* in: pointer to a MySQL THD object */ { THD* thd; - char* old_buf = buf; thd = (THD*) input_thd; - /* We cannot use the return value of normal sprintf() as this is - not portable to some old non-Posix Unixes, e.g., some old SCO - Unixes */ - - buf += my_sprintf(buf, - (buf, "MySQL thread id %lu, query id %lu", - thd->thread_id, thd->query_id)); - if (thd->host) { - *buf = ' '; - buf++; - buf = strnmov(buf, thd->host, 30); - } + fprintf(f, "MySQL thread id %lu, query id %lu", + thd->thread_id, thd->query_id); + if (thd->host) { + putc(' ', f); + fputs(thd->host, f); + } - if (thd->ip) { - *buf = ' '; - buf++; - buf=strnmov(buf, thd->ip, 20); - } + if (thd->ip) { + putc(' ', f); + fputs(thd->ip, f); + } if (thd->user) { - *buf = ' '; - buf++; - buf=strnmov(buf, thd->user, 20); - } - - if (thd->proc_info) { - *buf = ' '; - buf++; - buf=strnmov(buf, thd->proc_info, 50); + putc(' ', f); + fputs(thd->user, f); } - if (thd->query) { - *buf = '\n'; - buf++; - buf=strnmov(buf, thd->query, 150); - } - - buf[0] = '\n'; - buf[1] = '\0'; /* Note that we must put a null character here to end - the printed string */ + if (thd->proc_info) { + putc(' ', f); + fputs(thd->proc_info, f); + } - /* We test the printed length did not overrun the buffer length of - 400 bytes */ + if (thd->query) { + putc(' ', f); + fputs(thd->query, f); + } - ut_a(strlen(old_buf) < 400); -} + putc('\n', f); } /************************************************************************* @@ -627,12 +609,11 @@ innobase_query_caching_of_table_permitted( return((my_bool)FALSE); } -extern "C" { /********************************************************************* Invalidates the MySQL query cache for the table. NOTE that the exact prototype of this function has to be in /innobase/row/row0ins.c! */ - +extern "C" void innobase_invalidate_query_cache( /*============================*/ @@ -652,6 +633,17 @@ innobase_invalidate_query_cache( TRUE); #endif } + +/********************************************************************* +Get the quote character to be used in SQL identifiers. */ +extern "C" +char +mysql_get_identifier_quote_char(void) +/*=================================*/ + /* out: quote character to be + used in SQL identifiers */ +{ + return '`'; } /********************************************************************* @@ -2102,24 +2094,20 @@ ha_innobase::write_row( if (prebuilt->trx != (trx_t*) current_thd->transaction.all.innobase_tid) { - char err_buf[2000]; - fprintf(stderr, "InnoDB: Error: the transaction object for the table handle is at\n" -"InnoDB: %lx, but for the current thread it is at %lx\n", - (ulong)prebuilt->trx, - (ulong)current_thd->transaction.all.innobase_tid); - - ut_sprintf_buf(err_buf, ((byte*)prebuilt) - 100, 200); - fprintf(stderr, -"InnoDB: Dump of 200 bytes around prebuilt: %.1000s\n", err_buf); - - ut_sprintf_buf(err_buf, +"InnoDB: %p, but for the current thread it is at %p\n", + prebuilt->trx, + current_thd->transaction.all.innobase_tid); + fputs("InnoDB: Dump of 200 bytes around prebuilt: ", stderr); + ut_print_buf(stderr, ((const byte*)prebuilt) - 100, 200); + fputs("\n" + "InnoDB: Dump of 200 bytes around transaction.all: ", + stderr); + ut_print_buf(stderr, ((byte*)(&(current_thd->transaction.all))) - 100, 200); - fprintf(stderr, -"InnoDB: Dump of 200 bytes around transaction.all: %.1000s\n", err_buf); - - ut_a(0); + putc('\n', stderr); + ut_error; } statistic_increment(ha_write_count, &LOCK_status); @@ -3390,12 +3378,9 @@ create_index( field = form->field[j]; - if (strlen(field->field_name) - == strlen(key_part->field->field_name) - && 0 == ut_cmp_in_lower_case( + if (0 == ut_cmp_in_lower_case( (char*)field->field_name, - (char*)key_part->field->field_name, - strlen(field->field_name))) { + (char*)key_part->field->field_name)) { /* Found the corresponding column */ break; @@ -3762,7 +3747,8 @@ ha_innobase::delete_table( /* Drop the table in InnoDB */ - error = row_drop_table_for_mysql(norm_name, trx); + error = row_drop_table_for_mysql(norm_name, trx, + thd->lex->sql_command == SQLCOM_DROP_DB); /* Flush the log to reduce probability that the .frm files and the InnoDB data dictionary get out-of-sync if the user runs @@ -3801,7 +3787,7 @@ innobase_drop_database( trx_t* trx; char* ptr; int error; - char namebuf[10000]; + char* namebuf; /* Get the transaction associated with the current thd, or create one if not yet created */ @@ -3821,6 +3807,7 @@ innobase_drop_database( } ptr++; + namebuf = my_malloc(len + 2, MYF(0)); memcpy(namebuf, ptr, len); namebuf[len] = '/'; @@ -3837,6 +3824,7 @@ innobase_drop_database( } error = row_drop_database_for_mysql(namebuf, trx); + my_free(namebuf, MYF(0)); /* Flush the log to reduce probability that the .frm files and the InnoDB data dictionary get out-of-sync if the user runs @@ -4361,15 +4349,18 @@ ha_innobase::update_table_comment( info on foreign keys */ const char* comment)/* in: table comment defined by user */ { - row_prebuilt_t* prebuilt = (row_prebuilt_t*)innobase_prebuilt; - uint length = strlen(comment); - char* str = my_malloc(length + 16500, MYF(0)); - char* pos; + uint length = strlen(comment); + char* str; + row_prebuilt_t* prebuilt = (row_prebuilt_t*)innobase_prebuilt; /* We do not know if MySQL can call this function before calling external_lock(). To be safe, update the thd of the current table handle. */ + if(length > 64000 - 3) { + return((char*)comment); /* string too long */ + } + update_thd(current_thd); prebuilt->trx->op_info = (char*)"returning table comment"; @@ -4378,37 +4369,47 @@ ha_innobase::update_table_comment( possible adaptive hash latch to avoid deadlocks of threads */ trx_search_latch_release_if_reserved(prebuilt->trx); - - if (!str) { - prebuilt->trx->op_info = (char*)""; + str = NULL; - return((char*)comment); - } + if (FILE* file = tmpfile()) { + long flen; - pos = str; - if (length) { - pos=strmov(str, comment); - *pos++=';'; - *pos++=' '; - } + /* output the data to a temporary file */ + fprintf(file, "InnoDB free: %lu kB", + (ulong) fsp_get_available_space_in_free_extents( + prebuilt->table->space)); + + dict_print_info_on_foreign_keys(FALSE, file, prebuilt->table); + flen = ftell(file); + if(length + flen + 3 > 64000) { + flen = 64000 - 3 - length; + } - pos += my_sprintf(pos, - (pos,"InnoDB free: %lu kB", - (ulong) fsp_get_available_space_in_free_extents( - prebuilt->table->space))); + ut_ad(flen > 0); - /* We assume 16000 - length bytes of space to print info; the limit - 16000 bytes is arbitrary, and MySQL could handle at least 64000 - bytes */ - - if (length < 16000) { - dict_print_info_on_foreign_keys(FALSE, pos, 16000 - length, - prebuilt->table); + /* allocate buffer for the full string, and + read the contents of the temporary file */ + + str = my_malloc(length + flen + 3, MYF(0)); + + if (str) { + char* pos = str + length; + if(length) { + memcpy(str, comment, length); + *pos++ = ';'; + *pos++ = ' '; + } + rewind(file); + flen = fread(pos, 1, flen, file); + pos[flen] = 0; + } + + fclose(file); } prebuilt->trx->op_info = (char*)""; - return(str); + return(str ? str : (char*) comment); } /*********************************************************************** @@ -4422,7 +4423,7 @@ ha_innobase::get_foreign_key_create_info(void) MUST be freed with ::free_foreign_key_create_info */ { row_prebuilt_t* prebuilt = (row_prebuilt_t*)innobase_prebuilt; - char* str; + char* str = 0; ut_a(prebuilt != NULL); @@ -4432,23 +4433,47 @@ ha_innobase::get_foreign_key_create_info(void) update_thd(current_thd); - prebuilt->trx->op_info = (char*)"getting info on foreign keys"; + if (FILE* file = tmpfile()) { + long flen; - /* In case MySQL calls this in the middle of a SELECT query, release - possible adaptive hash latch to avoid deadlocks of threads */ + prebuilt->trx->op_info = (char*)"getting info on foreign keys"; - trx_search_latch_release_if_reserved(prebuilt->trx); - - str = (char*)ut_malloc(10000); + /* In case MySQL calls this in the middle of a SELECT query, + release possible adaptive hash latch to avoid + deadlocks of threads */ - str[0] = '\0'; - - dict_print_info_on_foreign_keys(TRUE, str, 9000, prebuilt->table); + trx_search_latch_release_if_reserved(prebuilt->trx); - prebuilt->trx->op_info = (char*)""; + /* output the data to a temporary file */ + dict_print_info_on_foreign_keys(TRUE, file, prebuilt->table); + prebuilt->trx->op_info = (char*)""; + + flen = ftell(file); + if(flen > 64000 - 1) { + flen = 64000 - 1; + } + + ut_ad(flen >= 0); + + /* allocate buffer for the string, and + read the contents of the temporary file */ + + str = my_malloc(flen + 1, MYF(0)); + + if (str) { + rewind(file); + flen = fread(str, 1, flen, file); + str[flen] = 0; + } + + fclose(file); + } else { + /* unable to create temporary file */ + str = my_malloc(1, MYF(MY_ZEROFILL)); + } return(str); -} +} /*********************************************************************** Checks if a table is referenced by a foreign key. The MySQL manual states that @@ -4481,7 +4506,7 @@ ha_innobase::free_foreign_key_create_info( char* str) /* in, own: create info string to free */ { if (str) { - ut_free(str); + my_free(str, MYF(0)); } } @@ -4772,34 +4797,55 @@ innodb_show_status( innobase_release_stat_resources(trx); - /* We let the InnoDB Monitor to output at most 60 kB of text, add - a safety margin of 100 kB for buffer overruns */ + /* We let the InnoDB Monitor to output at most 64000 bytes of text. */ - buf = (char*)ut_malloc(160 * 1024); + long flen; + char* str; - srv_sprintf_innodb_monitor(buf, 60 * 1024); + mutex_enter_noninline(&srv_monitor_file_mutex); + rewind(srv_monitor_file); + srv_printf_innodb_monitor(srv_monitor_file); + flen = ftell(srv_monitor_file); + if(flen > 64000 - 1) { + flen = 64000 - 1; + } - List<Item> field_list; + ut_ad(flen > 0); - field_list.push_back(new Item_empty_string("Status", strlen(buf))); + /* allocate buffer for the string, and + read the contents of the temporary file */ - if (protocol->send_fields(&field_list, 1)) + if (!(str = my_malloc(flen + 1, MYF(0)))) { - ut_free(buf); - DBUG_RETURN(-1); + mutex_exit_noninline(&srv_monitor_file_mutex); + DBUG_RETURN(-1); } - protocol->prepare_for_resend(); - protocol->store(buf, strlen(buf), system_charset_info); + rewind(srv_monitor_file); + flen = fread(str, 1, flen, srv_monitor_file); + + mutex_exit_noninline(&srv_monitor_file_mutex); + + List<Item> field_list; - ut_free(buf); + field_list.push_back(new Item_empty_string("Status", flen)); + + if (protocol->send_fields(&field_list, 1)) { + + my_free(str, MYF(0)); + + DBUG_RETURN(-1); + } + + protocol->prepare_for_resend(); + protocol->store(str, flen, system_charset_info); + my_free(str, MYF(0)); if (protocol->write()) DBUG_RETURN(-1); - send_eof(thd); - - DBUG_RETURN(0); + send_eof(thd); + DBUG_RETURN(0); } /**************************************************************************** diff --git a/sql/ha_innodb.h b/sql/ha_innodb.h index e5f1c66e23a..83a3edc4126 100644 --- a/sql/ha_innodb.h +++ b/sql/ha_innodb.h @@ -123,6 +123,8 @@ class ha_innobase: public handler whose size is > MAX_KEY_LENGTH */ uint max_key_length() const { return((MAX_KEY_LENGTH <= 3500) ? MAX_KEY_LENGTH : 3500);} + uint max_key_part_length() { return((MAX_KEY_LENGTH <= 3500) ? + MAX_KEY_LENGTH : 3500);} const key_map *keys_to_use_for_scanning() { return &key_map_full; } bool has_transactions() { return 1;} diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc new file mode 100644 index 00000000000..3bc322878d1 --- /dev/null +++ b/sql/ha_ndbcluster.cc @@ -0,0 +1,2943 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +/* + This file defines the NDB Cluster handler: the interface between MySQL and + NDB Cluster +*/ + +/* + TODO + After CREATE DATABASE gör discover på alla tabeller i den databasen + +*/ + + +#ifdef __GNUC__ +#pragma implementation // gcc: Class implementation +#endif + +#include "mysql_priv.h" + +#ifdef HAVE_NDBCLUSTER_DB +#include <my_dir.h> +#include "ha_ndbcluster.h" +#include <ndbapi/NdbApi.hpp> +#include <ndbapi/NdbScanFilter.hpp> + +#define USE_DISCOVER_ON_STARTUP +//#define USE_NDB_POOL + +// Default value for parallelism +static const int parallelism= 240; + +#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 + +#define ERR_PRINT(err) \ + DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message)) + +#define ERR_RETURN(err) \ +{ \ + ERR_PRINT(err); \ + DBUG_RETURN(ndb_to_mysql_error(&err)); \ +} + +// Typedefs for long names +typedef NdbDictionary::Column NDBCOL; +typedef NdbDictionary::Table NDBTAB; +typedef NdbDictionary::Index NDBINDEX; +typedef NdbDictionary::Dictionary NDBDICT; + +bool ndbcluster_inited= false; + +// Handler synchronization +pthread_mutex_t ndbcluster_mutex; + +// Table lock handling +static HASH ndbcluster_open_tables; + +static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length, + my_bool not_used __attribute__((unused))); +static NDB_SHARE *get_share(const char *table_name); +static void free_share(NDB_SHARE *share); + +static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len); +static int unpackfrm(const void **data, uint *len, + const void* pack_data); + +/* + Error handling functions +*/ + +struct err_code_mapping +{ + int ndb_err; + int my_err; +}; + +static const err_code_mapping err_map[]= +{ + { 626, HA_ERR_KEY_NOT_FOUND }, + { 630, HA_ERR_FOUND_DUPP_KEY }, + { 893, HA_ERR_FOUND_DUPP_UNIQUE }, + { 721, HA_ERR_TABLE_EXIST }, + { 241, HA_ERR_OLD_METADATA }, + { -1, -1 } +}; + + +static int ndb_to_mysql_error(const NdbError *err) +{ + uint i; + for (i=0 ; err_map[i].ndb_err != err->code ; i++) + { + if (err_map[i].my_err == -1) + return err->code; + } + return err_map[i].my_err; +} + + +/* + Take care of the error that occured in NDB + + RETURN + 0 No error + # The mapped error code +*/ + +int ha_ndbcluster::ndb_err(NdbConnection *trans) +{ + const NdbError err= trans->getNdbError(); + if (!err.code) + return 0; // Don't log things to DBUG log if no error + DBUG_ENTER("ndb_err"); + + ERR_PRINT(err); + switch (err.classification) { + case NdbError::SchemaError: + { + NDBDICT *dict= m_ndb->getDictionary(); + DBUG_PRINT("info", ("invalidateTable %s", m_tabname)); + dict->invalidateTable(m_tabname); + break; + } + default: + break; + } + DBUG_RETURN(ndb_to_mysql_error(&err)); +} + + +/* + Instruct NDB to set the value of the hidden primary key +*/ + +bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op, + uint fieldnr, const byte *field_ptr) +{ + DBUG_ENTER("set_hidden_key"); + DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr, + NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0); +} + + +/* + Instruct NDB to set the value of one primary key attribute +*/ + +int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, + uint fieldnr, const byte *field_ptr) +{ + uint32 pack_len= field->pack_length(); + DBUG_ENTER("set_ndb_key"); + DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d", + fieldnr, field->field_name, field->type(), + pack_len)); + DBUG_DUMP("key", (char*)field_ptr, pack_len); + + switch (field->type()) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + // Common implementation for most field types + DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0); + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + default: + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + DBUG_RETURN(3); +} + + +/* + Instruct NDB to set the value of one attribute +*/ + +int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, + uint fieldnr) +{ + const byte* field_ptr= field->ptr; + uint32 pack_len= field->pack_length(); + DBUG_ENTER("set_ndb_value"); + DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s", + fieldnr, field->field_name, field->type(), + pack_len, field->is_null()?"Y":"N")); + DBUG_DUMP("value", (char*) field_ptr, pack_len); + + if (field->is_null()) + { + // Set value to NULL + DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); + } + + switch (field->type()) { + case MYSQL_TYPE_DECIMAL: + case MYSQL_TYPE_TINY: + case MYSQL_TYPE_SHORT: + case MYSQL_TYPE_LONG: + case MYSQL_TYPE_FLOAT: + case MYSQL_TYPE_DOUBLE: + case MYSQL_TYPE_TIMESTAMP: + case MYSQL_TYPE_LONGLONG: + case MYSQL_TYPE_INT24: + case MYSQL_TYPE_DATE: + case MYSQL_TYPE_TIME: + case MYSQL_TYPE_DATETIME: + case MYSQL_TYPE_YEAR: + case MYSQL_TYPE_NEWDATE: + case MYSQL_TYPE_ENUM: + case MYSQL_TYPE_SET: + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + // Common implementation for most field types + DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); + + case MYSQL_TYPE_TINY_BLOB: + case MYSQL_TYPE_MEDIUM_BLOB: + case MYSQL_TYPE_LONG_BLOB: + case MYSQL_TYPE_BLOB: + case MYSQL_TYPE_NULL: + case MYSQL_TYPE_GEOMETRY: + default: + // Unhandled field types + DBUG_PRINT("error", ("Field type %d not supported", field->type())); + DBUG_RETURN(2); + } + DBUG_RETURN(3); +} + + +/* + Instruct NDB to fetch one field + - data is read directly into buffer provided by field_ptr + if it's NULL, data is read into memory provided by NDBAPI +*/ + +int ha_ndbcluster::get_ndb_value(NdbOperation *op, + uint field_no, byte *field_ptr) +{ + DBUG_ENTER("get_ndb_value"); + DBUG_PRINT("enter", ("field_no: %d", field_no)); + m_value[field_no]= op->getValue(field_no, field_ptr); + DBUG_RETURN(m_value == NULL); +} + + +/* + Get metadata for this table from NDB + + IMPLEMENTATION + - save the NdbDictionary::Table for easy access + - check that frm-file on disk is equal to frm-file + of table accessed in NDB + - build a list of the indexes for the table +*/ + +int ha_ndbcluster::get_metadata(const char *path) +{ + NDBDICT *dict= m_ndb->getDictionary(); + const NDBTAB *tab; + const void *data, *pack_data; + const char **key_name; + uint ndb_columns, mysql_columns, length, pack_length, i; + int error; + DBUG_ENTER("get_metadata"); + DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path)); + + if (!(tab= dict->getTable(m_tabname))) + ERR_RETURN(dict->getNdbError()); + DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); + + /* + This is the place to check that the table we got from NDB + is equal to the one on local disk + */ + ndb_columns= (uint) tab->getNoOfColumns(); + mysql_columns= table->fields; + if (table->primary_key == MAX_KEY) + ndb_columns--; + if (ndb_columns != mysql_columns) + { + DBUG_PRINT("error", + ("Wrong number of columns, ndb: %d mysql: %d", + ndb_columns, mysql_columns)); + DBUG_RETURN(HA_ERR_OLD_METADATA); + } + + /* + Compare FrmData in NDB with frm file from disk. + */ + error= 0; + if (readfrm(path, &data, &length) || + packfrm(data, length, &pack_data, &pack_length)) + { + my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); + my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_RETURN(1); + } + + if ((pack_length != tab->getFrmLength()) || + (memcmp(pack_data, tab->getFrmData(), pack_length))) + { + DBUG_PRINT("error", + ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", + pack_length, tab->getFrmLength(), + memcmp(pack_data, tab->getFrmData(), pack_length))); + DBUG_DUMP("pack_data", (char*)pack_data, pack_length); + DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength()); + error= HA_ERR_OLD_METADATA; + } + my_free((char*)data, MYF(0)); + my_free((char*)pack_data, MYF(0)); + if (error) + DBUG_RETURN(error); + + // All checks OK, lets use the table + m_table= (void*)tab; + + for (i= 0; i < MAX_KEY; i++) + m_indextype[i]= UNDEFINED_INDEX; + + // Save information about all known indexes + for (i= 0; i < table->keys; i++) + m_indextype[i] = get_index_type_from_table(i); + + DBUG_RETURN(0); +} + +/* + Decode the type of an index from information + provided in table object +*/ +NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const +{ + if (index_no == table->primary_key) + return PRIMARY_KEY_INDEX; + else + return ((table->key_info[index_no].flags & HA_NOSAME) ? + UNIQUE_INDEX : + ORDERED_INDEX); +} + + +void ha_ndbcluster::release_metadata() +{ + DBUG_ENTER("release_metadata"); + DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); + + m_table= NULL; + + DBUG_VOID_RETURN; +} + +static const ulong index_type_flags[]= +{ + /* UNDEFINED_INDEX */ + 0, + + /* PRIMARY_KEY_INDEX */ + HA_ONLY_WHOLE_INDEX | + HA_WRONG_ASCII_ORDER | + HA_NOT_READ_PREFIX_LAST, + + /* UNIQUE_INDEX */ + HA_ONLY_WHOLE_INDEX | + HA_WRONG_ASCII_ORDER | + HA_NOT_READ_PREFIX_LAST, + + /* ORDERED_INDEX */ + HA_READ_NEXT | + HA_READ_PREV | + HA_NOT_READ_AFTER_KEY +}; + +static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong); + +inline const char* ha_ndbcluster::get_index_name(uint idx_no) const +{ + return table->keynames.type_names[idx_no]; +} + +inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const +{ + DBUG_ASSERT(idx_no < MAX_KEY); + return m_indextype[idx_no]; +} + + +/* + Get the flags for an index + + RETURN + flags depending on the type of the index. +*/ + +inline ulong ha_ndbcluster::index_flags(uint idx_no) const +{ + DBUG_ENTER("index_flags"); + DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size); + DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]); +} + + +int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key) +{ + KEY* key_info= table->key_info + table->primary_key; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + DBUG_ENTER("set_primary_key"); + + for (; key_part != end; key_part++) + { + Field* field= key_part->field; + if (set_ndb_key(op, field, + key_part->fieldnr-1, key)) + ERR_RETURN(op->getNdbError()); + key += key_part->length; + } + DBUG_RETURN(0); +} + + +int ha_ndbcluster::set_primary_key(NdbOperation *op) +{ + DBUG_ENTER("set_primary_key"); + KEY* key_info= table->key_info + table->primary_key; + KEY_PART_INFO* key_part= key_info->key_part; + KEY_PART_INFO* end= key_part+key_info->key_parts; + + for (; key_part != end; key_part++) + { + Field* field= key_part->field; + if (set_ndb_key(op, field, + key_part->fieldnr-1, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + DBUG_RETURN(0); +} + + +/* + Read one record from NDB using primary key +*/ + +int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) +{ + uint no_fields= table->fields, i; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + THD *thd= current_thd; + DBUG_ENTER("pk_read"); + DBUG_PRINT("enter", ("key_len: %u", key_len)); + DBUG_DUMP("key", (char*)key, key_len); + + if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0) + goto err; + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + DBUG_DUMP("key", (char*)key, 8); + if (set_hidden_key(op, no_fields, key)) + goto err; + // Read key at the same time, for future reference + if (get_ndb_value(op, no_fields, NULL)) + goto err; + } + else + { + int res; + if ((res= set_primary_key(op, key))) + return res; + } + + // Read non-key field(s) + for (i= 0; i < no_fields; i++) + { + Field *field= table->field[i]; + if (thd->query_id == field->query_id) + { + if (get_ndb_value(op, i, field->ptr)) + goto err; + } + else + { + // Attribute was not to be read + m_value[i]= NULL; + } + } + + if (trans->execute(NoCommit, IgnoreError) != 0) + { + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(ndb_err(trans)); + } + + // The value have now been fetched from NDB + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + + err: + ERR_RETURN(trans->getNdbError()); +} + + +/* + Read one record from NDB using unique secondary index +*/ + +int ha_ndbcluster::unique_index_read(const byte *key, + uint key_len, byte *buf) +{ + NdbConnection *trans= m_active_trans; + const char *index_name; + NdbIndexOperation *op; + THD *thd= current_thd; + byte *key_ptr; + KEY* key_info; + KEY_PART_INFO *key_part, *end; + uint i; + DBUG_ENTER("unique_index_read"); + DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); + DBUG_DUMP("key", (char*)key, key_len); + + index_name= get_index_name(active_index); + if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) || + op->readTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + // Set secondary index key(s) + key_ptr= (byte *) key; + key_info= table->key_info + active_index; + DBUG_ASSERT(key_info->key_length == key_len); + end= (key_part= key_info->key_part) + key_info->key_parts; + + for (i= 0; key_part != end; key_part++, i++) + { + if (set_ndb_key(op, key_part->field, i, key_ptr)) + ERR_RETURN(trans->getNdbError()); + key_ptr+= key_part->length; + } + + // Get non-index attribute(s) + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG)) + { + if (get_ndb_value(op, i, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + else + { + // Attribute was not to be read + m_value[i]= NULL; + } + } + + if (trans->execute(NoCommit, IgnoreError) != 0) + { + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(ndb_err(trans)); + } + // The value have now been fetched from NDB + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); +} + +/* + Get the next record of a started scan +*/ + +inline int ha_ndbcluster::next_result(byte *buf) +{ + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("next_result"); + + if (cursor->nextResult() == 0) + { + // One more record found + unpack_record(buf); + table->status= 0; + DBUG_RETURN(0); + } + table->status= STATUS_NOT_FOUND; + if (ndb_err(trans)) + ERR_RETURN(trans->getNdbError()); + + // No more records + DBUG_PRINT("info", ("No more records")); + DBUG_RETURN(HA_ERR_END_OF_FILE); +} + + +/* + Read record(s) from NDB using ordered index scan +*/ + +int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag) +{ + uint no_fields= table->fields; + uint tot_len, i; + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor= m_active_cursor; + NdbScanOperation *op; + const char *bound_str= NULL; + const char *index_name; + NdbOperation::BoundType bound_type = NdbOperation::BoundEQ; + bool can_be_handled_by_ndb= FALSE; + byte *key_ptr; + KEY *key_info; + THD* thd = current_thd; + DBUG_ENTER("ordered_index_scan"); + DBUG_PRINT("enter", ("index: %u", active_index)); + DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); + + index_name= get_index_name(active_index); + if (!(op= trans->getNdbScanOperation(index_name, m_tabname))) + ERR_RETURN(trans->getNdbError()); + if (!(cursor= op->readTuples(parallelism))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + switch (find_flag) { + case HA_READ_KEY_EXACT: /* Find first record else error */ + bound_str= "HA_READ_KEY_EXACT"; + bound_type= NdbOperation::BoundEQ; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_KEY_OR_NEXT: /* Record or next record */ + bound_str= "HA_READ_KEY_OR_NEXT"; + bound_type= NdbOperation::BoundLE; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_KEY_OR_PREV: /* Record or previous */ + bound_str= "HA_READ_KEY_OR_PREV"; + bound_type= NdbOperation::BoundGE; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_AFTER_KEY: /* Find next rec. after key-record */ + bound_str= "HA_READ_AFTER_KEY"; + bound_type= NdbOperation::BoundLT; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */ + bound_str= "HA_READ_BEFORE_KEY"; + bound_type= NdbOperation::BoundGT; + can_be_handled_by_ndb= TRUE; + break; + case HA_READ_PREFIX: /* Key which as same prefix */ + bound_str= "HA_READ_PREFIX"; + break; + case HA_READ_PREFIX_LAST: /* Last key with the same prefix */ + bound_str= "HA_READ_PREFIX_LAST"; + break; + case HA_READ_PREFIX_LAST_OR_PREV: + /* Last or prev key with the same prefix */ + bound_str= "HA_READ_PREFIX_LAST_OR_PREV"; + break; + default: + bound_str= "UNKNOWN"; + break; + } + DBUG_PRINT("info", ("find_flag: %s, bound_type: %d," + "can_be_handled_by_ndb: %d", + bound_str, bound_type, can_be_handled_by_ndb)); + if (!can_be_handled_by_ndb) + DBUG_RETURN(1); + + // Set bounds using key data + tot_len= 0; + key_ptr= (byte *) key; + key_info= table->key_info + active_index; + for (i= 0; i < key_info->key_parts; i++) + { + Field* field= key_info->key_part[i].field; + uint32 field_len= field->pack_length(); + DBUG_PRINT("info", ("Set index bound on %s", + field->field_name)); + DBUG_DUMP("key", (char*)key_ptr, field_len); + + if (op->setBound(field->field_name, + bound_type, + key_ptr, + field_len) != 0) + ERR_RETURN(op->getNdbError()); + + key_ptr+= field_len; + tot_len+= field_len; + if (tot_len >= key_len) + break; + } + + // Define attributes to read + for (i= 0; i < no_fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG)) + { + if (get_ndb_value(op, i, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + else + { + m_value[i]= NULL; + } + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= no_fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} + + +#if 0 +/* + Read record(s) from NDB using full table scan with filter + */ + +int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag) +{ + uint no_fields= table->fields; + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor= m_active_cursor; + + DBUG_ENTER("filtered_scan"); + DBUG_PRINT("enter", ("key_len: %u, index: %u", + key_len, active_index)); + DBUG_DUMP("key", (char*)key, key_len); + DBUG_PRINT("info", ("Starting a new filtered scan on %s", + m_tabname)); + NdbScanOperation *op= trans->getNdbScanOperation(m_tabname); + if (!op) + ERR_RETURN(trans->getNdbError()); + + cursor= op->readTuples(parallelism); + if (!cursor) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + { + // Start scan filter + NdbScanFilter sf(op); + sf.begin(); + + // Set filter using the supplied key data + byte *key_ptr= (byte *) key; + uint tot_len= 0; + KEY* key_info= table->key_info + active_index; + for (uint k= 0; k < key_info->key_parts; k++) + { + KEY_PART_INFO* key_part= key_info->key_part+k; + Field* field= key_part->field; + uint ndb_fieldnr= key_part->fieldnr-1; + DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr)); + // const NDBCOL *col= tab->getColumn(ndb_fieldnr); + uint32 field_len= field->pack_length(); + DBUG_DUMP("key", (char*)key, field_len); + + DBUG_PRINT("info", ("Column %s, type: %d, len: %d", + field->field_name, field->real_type(), field_len)); + + // Define scan filter + if (field->real_type() == MYSQL_TYPE_STRING) + sf.eq(ndb_fieldnr, key_ptr, field_len); + else + { + if (field_len == 8) + sf.eq(ndb_fieldnr, (Uint64)*key_ptr); + else if (field_len <= 4) + sf.eq(ndb_fieldnr, (Uint32)*key_ptr); + else + DBUG_RETURN(1); + } + + key_ptr += field_len; + tot_len += field_len; + + if (tot_len >= key_len) + break; + } + // End scan filter + sf.end(); + } + + // Define attributes to read + for (uint field_no= 0; field_no < no_fields; field_no++) + { + Field *field= table->field[field_no]; + + // Read attribute + DBUG_PRINT("get", ("%d: %s", field_no, field->field_name)); + if (get_ndb_value(op, field_no, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= no_fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} +#endif + + +/* + Read records from NDB using full table scan + */ + +int ha_ndbcluster::full_table_scan(byte *buf) +{ + uint i; + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + NdbResultSet *cursor; + NdbScanOperation *op; + + DBUG_ENTER("full_table_scan"); + DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); + + if (!(op=trans->getNdbScanOperation(m_tabname))) + ERR_RETURN(trans->getNdbError()); + if (!(cursor= op->readTuples(parallelism))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + + // Define attributes to read + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) || + (field->flags & PRI_KEY_FLAG)) + { + if (get_ndb_value(op, i, field->ptr)) + ERR_RETURN(op->getNdbError()); + } + else + { + m_value[i]= NULL; + } + } + + if (table->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Getting hidden key")); + // Scanning table with no primary key + int hidden_no= table->fields; +#ifndef DBUG_OFF + const NDBTAB *tab= (NDBTAB *) m_table; + if (!tab->getColumn(hidden_no)) + DBUG_RETURN(1); +#endif + if (get_ndb_value(op, hidden_no, NULL)) + ERR_RETURN(op->getNdbError()); + } + + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_PRINT("exit", ("Scan started successfully")); + DBUG_RETURN(next_result(buf)); +} + + +/* + Insert one record into NDB +*/ + +int ha_ndbcluster::write_row(byte *record) +{ + uint i; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + int res; + DBUG_ENTER("write_row"); + + statistic_increment(ha_write_count,&LOCK_status); + if (table->timestamp_default_now) + update_timestamp(record+table->timestamp_default_now-1); + if (table->next_number_field && record == table->record[0]) + update_auto_increment(); + + if (!(op= trans->getNdbOperation(m_tabname))) + ERR_RETURN(trans->getNdbError()); + + res= (m_use_write) ? op->writeTuple() :op->insertTuple(); + if (res != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // Table has hidden primary key + Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); + if (set_hidden_key(op, table->fields, (const byte*)&auto_value)) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op))) + return res; + } + + // Set non-key attribute(s) + for (i= 0; i < table->fields; i++) + { + Field *field= table->field[i]; + if (!(field->flags & PRI_KEY_FLAG) && + set_ndb_value(op, field, i)) + ERR_RETURN(op->getNdbError()); + } + + /* + Execute write operation + NOTE When doing inserts with many values in + each INSERT statement it should not be necessary + to NoCommit the transaction between each row. + Find out how this is detected! + */ + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_RETURN(0); +} + + +/* Compare if a key in a row has changed */ + +int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row, + const byte * new_row) +{ + KEY_PART_INFO *key_part=table->key_info[keynr].key_part; + KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts; + + for (; key_part != end ; key_part++) + { + if (key_part->null_bit) + { + if ((old_row[key_part->null_offset] & key_part->null_bit) != + (new_row[key_part->null_offset] & key_part->null_bit)) + return 1; + } + if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH)) + { + + if (key_part->field->cmp_binary((char*) (old_row + key_part->offset), + (char*) (new_row + key_part->offset), + (ulong) key_part->length)) + return 1; + } + else + { + if (memcmp(old_row+key_part->offset, new_row+key_part->offset, + key_part->length)) + return 1; + } + } + return 0; +} + +/* + Update one record in NDB using primary key +*/ + +int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) +{ + THD *thd= current_thd; + NdbConnection *trans= m_active_trans; + NdbOperation *op; + uint i; + DBUG_ENTER("update_row"); + + statistic_increment(ha_update_count,&LOCK_status); + if (table->timestamp_on_update_now) + update_timestamp(new_data+table->timestamp_on_update_now-1); + + if (!(op= trans->getNdbOperation(m_tabname)) || + op->updateTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + + // Require that the PK for this record has previously been + // read into m_value + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec); + DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + /* Check for update of primary key and return error */ + if (key_cmp(table->primary_key, old_data, new_data)) + DBUG_RETURN(HA_ERR_UNSUPPORTED); + + int res; + if ((res= set_primary_key(op, old_data + table->null_bytes))) + DBUG_RETURN(res); + } + + // Set non-key attribute(s) + for (i= 0; i < table->fields; i++) + { + + Field *field= table->field[i]; + if ((thd->query_id == field->query_id) && + (!(field->flags & PRI_KEY_FLAG)) && + set_ndb_value(op, field, i)) + ERR_RETURN(op->getNdbError()); + } + + // Execute update operation + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + + DBUG_RETURN(0); +} + + +/* + Delete one record from NDB, using primary key +*/ + +int ha_ndbcluster::delete_row(const byte *record) +{ + NdbConnection *trans= m_active_trans; + NdbOperation *op; + DBUG_ENTER("delete_row"); + + statistic_increment(ha_delete_count,&LOCK_status); + + if (!(op=trans->getNdbOperation(m_tabname)) || + op->deleteTuple() != 0) + ERR_RETURN(trans->getNdbError()); + + if (table->primary_key == MAX_KEY) + { + // This table has no primary key, use "hidden" primary key + DBUG_PRINT("info", ("Using hidden key")); + uint no_fields= table->fields; + NdbRecAttr* rec= m_value[no_fields]; + DBUG_ASSERT(rec != NULL); + + if (set_hidden_key(op, no_fields, rec->aRef())) + ERR_RETURN(op->getNdbError()); + } + else + { + int res; + if ((res= set_primary_key(op))) + return res; + } + + // Execute delete operation + if (trans->execute(NoCommit) != 0) + DBUG_RETURN(ndb_err(trans)); + DBUG_RETURN(0); +} + +/* + Unpack a record read from NDB + + SYNOPSIS + unpack_record() + buf Buffer to store read row + + NOTE + The data for each row is read directly into the + destination buffer. This function is primarily + called in order to check if any fields should be + set to null. +*/ + +void ha_ndbcluster::unpack_record(byte* buf) +{ + uint row_offset= (uint) (buf - table->record[0]); + Field **field, **end; + NdbRecAttr **value= m_value; + DBUG_ENTER("unpack_record"); + + // Set null flag(s) + bzero(buf, table->null_bytes); + for (field= table->field, end= field+table->fields; + field < end; + field++, value++) + { + if (*value && (*value)->isNULL()) + (*field)->set_null(row_offset); + } + +#ifndef DBUG_OFF + // Read and print all values that was fetched + if (table->primary_key == MAX_KEY) + { + // Table with hidden primary key + int hidden_no= table->fields; + const NDBTAB *tab= (NDBTAB *) m_table; + const NDBCOL *hidden_col= tab->getColumn(hidden_no); + NdbRecAttr* rec= m_value[hidden_no]; + DBUG_ASSERT(rec); + DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, + hidden_col->getName(), rec->u_64_value())); + } + print_results(); +#endif + DBUG_VOID_RETURN; +} + + +/* + Utility function to print/dump the fetched field + */ + +void ha_ndbcluster::print_results() +{ + const NDBTAB *tab= (NDBTAB*) m_table; + DBUG_ENTER("print_results"); + +#ifndef DBUG_OFF + if (!_db_on_) + DBUG_VOID_RETURN; + + for (uint f=0; f<table->fields;f++) + { + Field *field; + const NDBCOL *col; + NdbRecAttr *value; + + if (!(value= m_value[f])) + { + fprintf(DBUG_FILE, "Field %d was not read\n", f); + continue; + } + field= table->field[f]; + DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); + col= tab->getColumn(f); + fprintf(DBUG_FILE, "%d: %s\t", f, col->getName()); + + if (value->isNULL()) + { + fprintf(DBUG_FILE, "NULL\n"); + continue; + } + + switch (col->getType()) { + case NdbDictionary::Column::Blob: + case NdbDictionary::Column::Undefined: + fprintf(DBUG_FILE, "Unknown type: %d", col->getType()); + break; + case NdbDictionary::Column::Tinyint: { + char value= *field->ptr; + fprintf(DBUG_FILE, "Tinyint\t%d", value); + break; + } + case NdbDictionary::Column::Tinyunsigned: { + unsigned char value= *field->ptr; + fprintf(DBUG_FILE, "Tinyunsigned\t%u", value); + break; + } + case NdbDictionary::Column::Smallint: { + short value= *field->ptr; + fprintf(DBUG_FILE, "Smallint\t%d", value); + break; + } + case NdbDictionary::Column::Smallunsigned: { + unsigned short value= *field->ptr; + fprintf(DBUG_FILE, "Smallunsigned\t%u", value); + break; + } + case NdbDictionary::Column::Mediumint: { + byte value[3]; + memcpy(value, field->ptr, 3); + fprintf(DBUG_FILE, "Mediumint\t%d,%d,%d", value[0], value[1], value[2]); + break; + } + case NdbDictionary::Column::Mediumunsigned: { + byte value[3]; + memcpy(value, field->ptr, 3); + fprintf(DBUG_FILE, "Mediumunsigned\t%u,%u,%u", value[0], value[1], value[2]); + break; + } + case NdbDictionary::Column::Int: { + fprintf(DBUG_FILE, "Int\t%lld", field->val_int()); + break; + } + case NdbDictionary::Column::Unsigned: { + Uint32 value= (Uint32) *field->ptr; + fprintf(DBUG_FILE, "Unsigned\t%u", value); + break; + } + case NdbDictionary::Column::Bigint: { + Int64 value= (Int64) *field->ptr; + fprintf(DBUG_FILE, "Bigint\t%lld", value); + break; + } + case NdbDictionary::Column::Bigunsigned: { + Uint64 value= (Uint64) *field->ptr; + fprintf(DBUG_FILE, "Bigunsigned\t%llu", value); + break; + } + case NdbDictionary::Column::Float: { + float value= (float) *field->ptr; + fprintf(DBUG_FILE, "Float\t%f", value); + break; + } + case NdbDictionary::Column::Double: { + double value= (double) *field->ptr; + fprintf(DBUG_FILE, "Double\t%f", value); + break; + } + case NdbDictionary::Column::Decimal: { + char *value= field->ptr; + + fprintf(DBUG_FILE, "Decimal\t'%-*s'", field->pack_length(), value); + break; + } + case NdbDictionary::Column::Char:{ + char buf[field->pack_length()+1]; + char *value= (char *) field->ptr; + snprintf(buf, field->pack_length(), "%s", value); + fprintf(DBUG_FILE, "Char\t'%s'", buf); + break; + } + case NdbDictionary::Column::Varchar: + case NdbDictionary::Column::Binary: + case NdbDictionary::Column::Varbinary: { + char *value= (char *) field->ptr; + fprintf(DBUG_FILE, "'%s'", value); + break; + } + case NdbDictionary::Column::Datetime: { + Uint64 value= (Uint64) *field->ptr; + fprintf(DBUG_FILE, "Datetime\t%llu", value); + break; + } + case NdbDictionary::Column::Timespec: { + Uint64 value= (Uint64) *field->ptr; + fprintf(DBUG_FILE, "Timespec\t%llu", value); + break; + } + } + fprintf(DBUG_FILE, "\n"); + + } +#endif + DBUG_VOID_RETURN; +} + + +int ha_ndbcluster::index_init(uint index) +{ + DBUG_ENTER("index_init"); + DBUG_PRINT("enter", ("index: %u", index)); + DBUG_RETURN(handler::index_init(index)); +} + + +int ha_ndbcluster::index_end() +{ + DBUG_ENTER("index_end"); + DBUG_RETURN(rnd_end()); +} + + +int ha_ndbcluster::index_read(byte *buf, + const byte *key, uint key_len, + enum ha_rkey_function find_flag) +{ + DBUG_ENTER("index_read"); + DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", + active_index, key_len, find_flag)); + + int error= 1; + statistic_increment(ha_read_key_count, &LOCK_status); + + switch (get_index_type(active_index)){ + case PRIMARY_KEY_INDEX: + error= pk_read(key, key_len, buf); + break; + + case UNIQUE_INDEX: + error= unique_index_read(key, key_len, buf); + break; + + case ORDERED_INDEX: + error= ordered_index_scan(key, key_len, buf, find_flag); + break; + + default: + case UNDEFINED_INDEX: + break; + } + DBUG_RETURN(error); +} + + +int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, + const byte *key, uint key_len, + enum ha_rkey_function find_flag) +{ + statistic_increment(ha_read_key_count,&LOCK_status); + DBUG_ENTER("index_read_idx"); + DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len)); + index_init(index_no); + DBUG_RETURN(index_read(buf, key, key_len, find_flag)); +} + + +int ha_ndbcluster::index_next(byte *buf) +{ + DBUG_ENTER("index_next"); + + int error = 1; + statistic_increment(ha_read_next_count,&LOCK_status); + if (get_index_type(active_index) == PRIMARY_KEY_INDEX) + error= HA_ERR_END_OF_FILE; + else + error = next_result(buf); + DBUG_RETURN(error); +} + + +int ha_ndbcluster::index_prev(byte *buf) +{ + DBUG_ENTER("index_prev"); + statistic_increment(ha_read_prev_count,&LOCK_status); + DBUG_RETURN(1); +} + + +int ha_ndbcluster::index_first(byte *buf) +{ + DBUG_ENTER("index_first"); + statistic_increment(ha_read_first_count,&LOCK_status); + DBUG_RETURN(1); +} + + +int ha_ndbcluster::index_last(byte *buf) +{ + DBUG_ENTER("index_last"); + statistic_increment(ha_read_last_count,&LOCK_status); + DBUG_RETURN(1); +} + + +int ha_ndbcluster::rnd_init(bool scan) +{ + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("rnd_init"); + DBUG_PRINT("enter", ("scan: %d", scan)); + // Check that cursor is not defined + if (cursor) + DBUG_RETURN(1); + index_init(table->primary_key); + DBUG_RETURN(0); +} + + +int ha_ndbcluster::rnd_end() +{ + NdbResultSet *cursor= m_active_cursor; + DBUG_ENTER("rnd_end"); + + if (cursor) + { + DBUG_PRINT("info", ("Closing the cursor")); + cursor->close(); + m_active_cursor= NULL; + } + DBUG_RETURN(0); +} + + +int ha_ndbcluster::rnd_next(byte *buf) +{ + DBUG_ENTER("rnd_next"); + statistic_increment(ha_read_rnd_next_count, &LOCK_status); + int error = 1; + if (!m_active_cursor) + error = full_table_scan(buf); + else + error = next_result(buf); + DBUG_RETURN(error); +} + + +/* + An "interesting" record has been found and it's pk + retrieved by calling position + Now it's time to read the record from db once + again +*/ + +int ha_ndbcluster::rnd_pos(byte *buf, byte *pos) +{ + DBUG_ENTER("rnd_pos"); + statistic_increment(ha_read_rnd_count,&LOCK_status); + // The primary key for the record is stored in pos + // Perform a pk_read using primary key "index" + DBUG_RETURN(pk_read(pos, ref_length, buf)); +} + + +/* + Store the primary key of this record in ref + variable, so that the row can be retrieved again later + using "reference" in rnd_pos +*/ + +void ha_ndbcluster::position(const byte *record) +{ + KEY *key_info; + KEY_PART_INFO *key_part; + KEY_PART_INFO *end; + byte *buff; + DBUG_ENTER("position"); + + if (table->primary_key != MAX_KEY) + { + key_info= table->key_info + table->primary_key; + key_part= key_info->key_part; + end= key_part + key_info->key_parts; + buff= ref; + + for (; key_part != end; key_part++) + { + if (key_part->null_bit) { + /* Store 0 if the key part is a NULL part */ + if (record[key_part->null_offset] + & key_part->null_bit) { + *buff++= 1; + continue; + } + *buff++= 0; + } + memcpy(buff, record + key_part->offset, key_part->length); + buff += key_part->length; + } + } + else + { + // No primary key, get hidden key + DBUG_PRINT("info", ("Getting hidden key")); + int hidden_no= table->fields; + NdbRecAttr* rec= m_value[hidden_no]; + const NDBTAB *tab= (NDBTAB *) m_table; + const NDBCOL *hidden_col= tab->getColumn(hidden_no); + DBUG_ASSERT(hidden_col->getPrimaryKey() && + hidden_col->getAutoIncrement() && + rec != NULL && + ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH); + memcpy(ref, (const void*)rec->aRef(), ref_length); + } + + DBUG_DUMP("ref", (char*)ref, ref_length); + DBUG_VOID_RETURN; +} + + +void ha_ndbcluster::info(uint flag) +{ + DBUG_ENTER("info"); + DBUG_PRINT("enter", ("flag: %d", flag)); + + if (flag & HA_STATUS_POS) + DBUG_PRINT("info", ("HA_STATUS_POS")); + if (flag & HA_STATUS_NO_LOCK) + DBUG_PRINT("info", ("HA_STATUS_NO_LOCK")); + if (flag & HA_STATUS_TIME) + DBUG_PRINT("info", ("HA_STATUS_TIME")); + if (flag & HA_STATUS_CONST) + DBUG_PRINT("info", ("HA_STATUS_CONST")); + if (flag & HA_STATUS_VARIABLE) + DBUG_PRINT("info", ("HA_STATUS_VARIABLE")); + if (flag & HA_STATUS_ERRKEY) + DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); + if (flag & HA_STATUS_AUTO) + DBUG_PRINT("info", ("HA_STATUS_AUTO")); + DBUG_VOID_RETURN; +} + + +int ha_ndbcluster::extra(enum ha_extra_function operation) +{ + DBUG_ENTER("extra"); + switch (operation) { + case HA_EXTRA_NORMAL: /* Optimize for space (def) */ + DBUG_PRINT("info", ("HA_EXTRA_NORMAL")); + break; + case HA_EXTRA_QUICK: /* Optimize for speed */ + DBUG_PRINT("info", ("HA_EXTRA_QUICK")); + break; + case HA_EXTRA_RESET: /* Reset database to after open */ + DBUG_PRINT("info", ("HA_EXTRA_RESET")); + break; + case HA_EXTRA_CACHE: /* Cash record in HA_rrnd() */ + DBUG_PRINT("info", ("HA_EXTRA_CACHE")); + break; + case HA_EXTRA_NO_CACHE: /* End cacheing of records (def) */ + DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE")); + break; + case HA_EXTRA_NO_READCHECK: /* No readcheck on update */ + DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK")); + break; + case HA_EXTRA_READCHECK: /* Use readcheck (def) */ + DBUG_PRINT("info", ("HA_EXTRA_READCHECK")); + break; + case HA_EXTRA_KEYREAD: /* Read only key to database */ + DBUG_PRINT("info", ("HA_EXTRA_KEYREAD")); + break; + case HA_EXTRA_NO_KEYREAD: /* Normal read of records (def) */ + DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD")); + break; + case HA_EXTRA_NO_USER_CHANGE: /* No user is allowed to write */ + DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE")); + break; + case HA_EXTRA_KEY_CACHE: + DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE")); + break; + case HA_EXTRA_NO_KEY_CACHE: + DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE")); + break; + case HA_EXTRA_WAIT_LOCK: /* Wait until file is avalably (def) */ + DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK")); + break; + case HA_EXTRA_NO_WAIT_LOCK: /* If file is locked, return quickly */ + DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK")); + break; + case HA_EXTRA_WRITE_CACHE: /* Use write cache in ha_write() */ + DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE")); + break; + case HA_EXTRA_FLUSH_CACHE: /* flush write_record_cache */ + DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE")); + break; + case HA_EXTRA_NO_KEYS: /* Remove all update of keys */ + DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS")); + break; + case HA_EXTRA_KEYREAD_CHANGE_POS: /* Keyread, but change pos */ + DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */ + break; + case HA_EXTRA_REMEMBER_POS: /* Remember pos for next/prev */ + DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS")); + break; + case HA_EXTRA_RESTORE_POS: + DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS")); + break; + case HA_EXTRA_REINIT_CACHE: /* init cache from current record */ + DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE")); + break; + case HA_EXTRA_FORCE_REOPEN: /* Datafile have changed on disk */ + DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN")); + break; + case HA_EXTRA_FLUSH: /* Flush tables to disk */ + DBUG_PRINT("info", ("HA_EXTRA_FLUSH")); + break; + case HA_EXTRA_NO_ROWS: /* Don't write rows */ + DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS")); + break; + case HA_EXTRA_RESET_STATE: /* Reset positions */ + DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE")); + break; + case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/ + DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY")); + + DBUG_PRINT("info", ("Turning ON use of write instead of insert")); + m_use_write= TRUE; + break; + case HA_EXTRA_NO_IGNORE_DUP_KEY: + DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY")); + DBUG_PRINT("info", ("Turning OFF use of write instead of insert")); + m_use_write= false; + break; + case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those + where field->query_id is the same as + the current query id */ + DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS")); + break; + case HA_EXTRA_PREPARE_FOR_DELETE: + DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE")); + break; + case HA_EXTRA_PREPARE_FOR_UPDATE: /* Remove read cache if problems */ + DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE")); + break; + case HA_EXTRA_PRELOAD_BUFFER_SIZE: + DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE")); + break; + case HA_EXTRA_RETRIEVE_PRIMARY_KEY: + DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY")); + break; + case HA_EXTRA_CHANGE_KEY_TO_UNIQUE: + DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE")); + break; + case HA_EXTRA_CHANGE_KEY_TO_DUP: + DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP")); + break; + + } + + DBUG_RETURN(0); +} + + +int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) +{ + DBUG_ENTER("extra_opt"); + DBUG_PRINT("enter", ("cache_size: %d", cache_size)); + DBUG_RETURN(extra(operation)); +} + + +int ha_ndbcluster::reset() +{ + DBUG_ENTER("reset"); + // Reset what? + DBUG_RETURN(1); +} + + +const char **ha_ndbcluster::bas_ext() const +{ static const char *ext[1] = { NullS }; return ext; } + + +/* + How many seeks it will take to read through the table + This is to be comparable to the number returned by records_in_range so + that we can decide if we should scan the table or use keys. +*/ + +double ha_ndbcluster::scan_time() +{ + return rows2double(records/3); +} + + +THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + DBUG_ENTER("store_lock"); + + if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK) + { + + /* If we are not doing a LOCK TABLE, then allow multiple + writers */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && !thd->in_lock_tables) + lock_type= TL_WRITE_ALLOW_WRITE; + + /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. */ + + if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) + lock_type= TL_READ; + + m_lock.type=lock_type; + } + *to++= &m_lock; + + DBUG_RETURN(to); +} + +#ifndef DBUG_OFF +#define PRINT_OPTION_FLAGS(t) { \ + if (t->options & OPTION_NOT_AUTOCOMMIT) \ + DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \ + if (t->options & OPTION_BEGIN) \ + DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \ + if (t->options & OPTION_TABLE_LOCK) \ + DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \ +} +#else +#define PRINT_OPTION_FLAGS(t) +#endif + + +/* + As MySQL will execute an external lock for every new table it uses + we can use this to start the transactions. + If we are in auto_commit mode we just need to start a transaction + for the statement, this will be stored in transaction.stmt. + If not, we have to start a master transaction if there doesn't exist + one from before, this will be stored in transaction.all + + When a table lock is held one transaction will be started which holds + the table lock and for each statement a hupp transaction will be started + */ + +int ha_ndbcluster::external_lock(THD *thd, int lock_type) +{ + int error=0; + NdbConnection* trans= NULL; + + DBUG_ENTER("external_lock"); + DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d", + thd->transaction.ndb_lock_count)); + + /* + Check that this handler instance has a connection + set up to the Ndb object of thd + */ + if (check_ndb_connection()) + DBUG_RETURN(1); + + if (lock_type != F_UNLCK) + { + if (!thd->transaction.ndb_lock_count++) + { + PRINT_OPTION_FLAGS(thd); + + if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK))) + { + // Autocommit transaction + DBUG_ASSERT(!thd->transaction.stmt.ndb_tid); + DBUG_PRINT("trans",("Starting transaction stmt")); + + trans= m_ndb->startTransaction(); + if (trans == NULL) + { + thd->transaction.ndb_lock_count--; // We didn't get the lock + ERR_RETURN(m_ndb->getNdbError()); + } + thd->transaction.stmt.ndb_tid= trans; + } + else + { + if (!thd->transaction.all.ndb_tid) + { + // Not autocommit transaction + // A "master" transaction ha not been started yet + DBUG_PRINT("trans",("starting transaction, all")); + + trans= m_ndb->startTransaction(); + if (trans == NULL) + { + thd->transaction.ndb_lock_count--; // We didn't get the lock + ERR_RETURN(m_ndb->getNdbError()); + } + + /* + If this is the start of a LOCK TABLE, a table look + should be taken on the table in NDB + + Check if it should be read or write lock + */ + if (thd->options & (OPTION_TABLE_LOCK)) + { + //lockThisTable(); + DBUG_PRINT("info", ("Locking the table..." )); + } + + thd->transaction.all.ndb_tid= trans; + } + } + } + /* + This is the place to make sure this handler instance + has a started transaction. + + The transaction is started by the first handler on which + MySQL Server calls external lock + + Other handlers in the same stmt or transaction should use + the same NDB transaction. This is done by setting up the m_active_trans + pointer to point to the NDB transaction. + */ + + m_active_trans= thd->transaction.all.ndb_tid ? + (NdbConnection*)thd->transaction.all.ndb_tid: + (NdbConnection*)thd->transaction.stmt.ndb_tid; + DBUG_ASSERT(m_active_trans); + + } + else + { + if (!--thd->transaction.ndb_lock_count) + { + DBUG_PRINT("trans", ("Last external_lock")); + PRINT_OPTION_FLAGS(thd); + + if (thd->transaction.stmt.ndb_tid) + { + /* + Unlock is done without a transaction commit / rollback. + This happens if the thread didn't update any rows + We must in this case close the transaction to release resources + */ + DBUG_PRINT("trans",("ending non-updating transaction")); + m_ndb->closeTransaction(m_active_trans); + thd->transaction.stmt.ndb_tid= 0; + } + } + m_active_trans= NULL; + } + DBUG_RETURN(error); +} + +/* + When using LOCK TABLE's external_lock is only called when the actual + TABLE LOCK is done. + Under LOCK TABLES, each used tables will force a call to start_stmt. +*/ + +int ha_ndbcluster::start_stmt(THD *thd) +{ + int error=0; + DBUG_ENTER("start_stmt"); + PRINT_OPTION_FLAGS(thd); + + NdbConnection *trans= (NdbConnection*)thd->transaction.stmt.ndb_tid; + if (!trans){ + DBUG_PRINT("trans",("Starting transaction stmt")); + + NdbConnection *tablock_trans= + (NdbConnection*)thd->transaction.all.ndb_tid; + DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans)); + DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); + if (trans == NULL) + ERR_RETURN(m_ndb->getNdbError()); + thd->transaction.stmt.ndb_tid= trans; + } + m_active_trans= trans; + + DBUG_RETURN(error); +} + + +/* + Commit a transaction started in NDB + */ + +int ndbcluster_commit(THD *thd, void *ndb_transaction) +{ + int res= 0; + Ndb *ndb= (Ndb*)thd->transaction.ndb; + NdbConnection *trans= (NdbConnection*)ndb_transaction; + + DBUG_ENTER("ndbcluster_commit"); + DBUG_PRINT("transaction",("%s", + trans == thd->transaction.stmt.ndb_tid ? + "stmt" : "all")); + DBUG_ASSERT(ndb && trans); + + if (trans->execute(Commit) != 0) + { + const NdbError err= trans->getNdbError(); + ERR_PRINT(err); + res= ndb_to_mysql_error(&err); + } + ndb->closeTransaction(trans); + DBUG_RETURN(res); +} + + +/* + Rollback a transaction started in NDB + */ + +int ndbcluster_rollback(THD *thd, void *ndb_transaction) +{ + int res= 0; + Ndb *ndb= (Ndb*)thd->transaction.ndb; + NdbConnection *trans= (NdbConnection*)ndb_transaction; + + DBUG_ENTER("ndbcluster_rollback"); + DBUG_PRINT("transaction",("%s", + trans == thd->transaction.stmt.ndb_tid ? + "stmt" : "all")); + DBUG_ASSERT(ndb && trans); + + if (trans->execute(Rollback) != 0) + { + const NdbError err= trans->getNdbError(); + ERR_PRINT(err); + res= ndb_to_mysql_error(&err); + } + ndb->closeTransaction(trans); + DBUG_RETURN(0); +} + + +/* + Map MySQL type to the corresponding NDB type + */ + +inline NdbDictionary::Column::Type +mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg) +{ + switch(mysql_type) { + case MYSQL_TYPE_DECIMAL: + return NdbDictionary::Column::Char; + case MYSQL_TYPE_TINY: + return (unsigned_flg) ? + NdbDictionary::Column::Tinyunsigned : + NdbDictionary::Column::Tinyint; + case MYSQL_TYPE_SHORT: + return (unsigned_flg) ? + NdbDictionary::Column::Smallunsigned : + NdbDictionary::Column::Smallint; + case MYSQL_TYPE_LONG: + return (unsigned_flg) ? + NdbDictionary::Column::Unsigned : + NdbDictionary::Column::Int; + case MYSQL_TYPE_TIMESTAMP: + return NdbDictionary::Column::Unsigned; + case MYSQL_TYPE_LONGLONG: + return (unsigned_flg) ? + NdbDictionary::Column::Bigunsigned : + NdbDictionary::Column::Bigint; + case MYSQL_TYPE_INT24: + return (unsigned_flg) ? + NdbDictionary::Column::Mediumunsigned : + NdbDictionary::Column::Mediumint; + break; + case MYSQL_TYPE_FLOAT: + return NdbDictionary::Column::Float; + case MYSQL_TYPE_DOUBLE: + return NdbDictionary::Column::Double; + case MYSQL_TYPE_DATETIME : + return NdbDictionary::Column::Datetime; + case MYSQL_TYPE_DATE : + case MYSQL_TYPE_NEWDATE : + case MYSQL_TYPE_TIME : + case MYSQL_TYPE_YEAR : + // Missing NDB data types, mapped to char + return NdbDictionary::Column::Char; + case MYSQL_TYPE_ENUM : + return NdbDictionary::Column::Char; + case MYSQL_TYPE_SET : + return NdbDictionary::Column::Char; + case MYSQL_TYPE_TINY_BLOB : + case MYSQL_TYPE_MEDIUM_BLOB : + case MYSQL_TYPE_LONG_BLOB : + case MYSQL_TYPE_BLOB : + return NdbDictionary::Column::Blob; + case MYSQL_TYPE_VAR_STRING : + return NdbDictionary::Column::Varchar; + case MYSQL_TYPE_STRING : + return NdbDictionary::Column::Char; + case MYSQL_TYPE_NULL : + case MYSQL_TYPE_GEOMETRY : + return NdbDictionary::Column::Undefined; + } + return NdbDictionary::Column::Undefined; +} + + +/* + Create a table in NDB Cluster + */ + +int ha_ndbcluster::create(const char *name, + TABLE *form, + HA_CREATE_INFO *info) +{ + NDBTAB tab; + NdbDictionary::Column::Type ndb_type; + NDBCOL col; + uint pack_length, length, i; + int res; + const void *data, *pack_data; + const char **key_name= form->keynames.type_names; + char name2[FN_HEADLEN]; + + DBUG_ENTER("create"); + DBUG_PRINT("enter", ("name: %s", name)); + fn_format(name2, name, "", "",2); // Remove the .frm extension + set_dbname(name2); + set_tabname(name2); + + DBUG_PRINT("table", ("name: %s", m_tabname)); + tab.setName(m_tabname); + tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); + + // Save frm data for this table + if (readfrm(name, &data, &length)) + DBUG_RETURN(1); + if (packfrm(data, length, &pack_data, &pack_length)) + DBUG_RETURN(2); + + DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length)); + tab.setFrm(pack_data, pack_length); + my_free((char*)data, MYF(0)); + my_free((char*)pack_data, MYF(0)); + + for (i= 0; i < form->fields; i++) + { + Field *field= form->field[i]; + ndb_type= mysql_to_ndb_type(field->real_type(), + field->flags & UNSIGNED_FLAG); + DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", + field->field_name, field->real_type(), + field->pack_length())); + col.setName(field->field_name); + col.setType(ndb_type); + if ((ndb_type == NdbDictionary::Column::Char) || + (ndb_type == NdbDictionary::Column::Varchar)) + col.setLength(field->pack_length()); + else + col.setLength(1); + col.setNullable(field->maybe_null()); + col.setPrimaryKey(field->flags & PRI_KEY_FLAG); + if (field->flags & AUTO_INCREMENT_FLAG) + { + DBUG_PRINT("info", ("Found auto_increment key")); + col.setAutoIncrement(TRUE); + ulonglong value = info->auto_increment_value ? + info->auto_increment_value -1 : + (ulonglong) 0; + DBUG_PRINT("info", ("initial value=%ld", value)); +// col.setInitialAutIncValue(value); + } + else + col.setAutoIncrement(false); + + tab.addColumn(col); + } + + // No primary key, create shadow key as 64 bit, auto increment + if (form->primary_key == MAX_KEY) + { + DBUG_PRINT("info", ("Generating shadow key")); + col.setName("$PK"); + col.setType(NdbDictionary::Column::Bigunsigned); + col.setLength(1); + col.setNullable(false); + col.setPrimaryKey(TRUE); + col.setAutoIncrement(TRUE); + tab.addColumn(col); + } + + my_errno= 0; + if (check_ndb_connection()) + { + my_errno= HA_ERR_NO_CONNECTION; + DBUG_RETURN(my_errno); + } + + // Create the table in NDB + NDBDICT *dict= m_ndb->getDictionary(); + if (dict->createTable(tab)) + { + const NdbError err= dict->getNdbError(); + ERR_PRINT(err); + my_errno= ndb_to_mysql_error(&err); + DBUG_RETURN(my_errno); + } + DBUG_PRINT("info", ("Table %s/%s created successfully", + m_dbname, m_tabname)); + + // Fetch table from NDB, check that it exists + const NDBTAB *tab2= dict->getTable(m_tabname); + if (tab2 == NULL) + { + const NdbError err= dict->getNdbError(); + ERR_PRINT(err); + my_errno= ndb_to_mysql_error(&err); + DBUG_RETURN(my_errno); + } + + // Create secondary indexes + for (i= 0; i < form->keys; i++) + { + DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i])); + if (i == form->primary_key) + { + DBUG_PRINT("info", ("Skipping it, PK already created")); + continue; + } + + DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i])); + res= create_index(key_name[i], + form->key_info + i); + switch(res){ + case 0: + // OK + break; + default: + DBUG_PRINT("error", ("Failed to create index %u", i)); + drop_table(); + my_errno= res; + goto err_end; + } + } + +err_end: + DBUG_RETURN(my_errno); +} + + +/* + Create an index in NDB Cluster + */ + +int ha_ndbcluster::create_index(const char *name, + KEY *key_info){ + NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); + KEY_PART_INFO *key_part= key_info->key_part; + KEY_PART_INFO *end= key_part + key_info->key_parts; + + DBUG_ENTER("create_index"); + DBUG_PRINT("enter", ("name: %s ", name)); + + // Check that an index with the same name do not already exist + if (dict->getIndex(name, m_tabname)) + ERR_RETURN(dict->getNdbError()); + + NdbDictionary::Index ndb_index(name); + if (key_info->flags & HA_NOSAME) + ndb_index.setType(NdbDictionary::Index::UniqueHashIndex); + else + { + ndb_index.setType(NdbDictionary::Index::OrderedIndex); + // TODO Only temporary ordered indexes supported + ndb_index.setLogging(false); + } + ndb_index.setTable(m_tabname); + + for (; key_part != end; key_part++) + { + Field *field= key_part->field; + DBUG_PRINT("info", ("attr: %s", field->field_name)); + ndb_index.addColumnName(field->field_name); + } + + if (dict->createIndex(ndb_index)) + ERR_RETURN(dict->getNdbError()); + + // Success + DBUG_PRINT("info", ("Created index %s", name)); + DBUG_RETURN(0); +} + + +/* + Rename a table in NDB Cluster +*/ + +int ha_ndbcluster::rename_table(const char *from, const char *to) +{ + char new_tabname[FN_HEADLEN]; + + DBUG_ENTER("ha_ndbcluster::rename_table"); + set_dbname(from); + set_tabname(from); + set_tabname(to, new_tabname); + + if (check_ndb_connection()) { + my_errno= HA_ERR_NO_CONNECTION; + DBUG_RETURN(my_errno); + } + + int result= alter_table_name(m_tabname, new_tabname); + if (result == 0) + set_tabname(to); + + DBUG_RETURN(result); +} + + +/* + Rename a table in NDB Cluster using alter table + */ + +int ha_ndbcluster::alter_table_name(const char *from, const char *to) +{ + NDBDICT *dict= m_ndb->getDictionary(); + const NDBTAB *orig_tab; + DBUG_ENTER("alter_table_name_table"); + DBUG_PRINT("enter", ("Renaming %s to %s", from, to)); + + if (!(orig_tab= dict->getTable(from))) + ERR_RETURN(dict->getNdbError()); + + NdbDictionary::Table copy_tab= dict->getTableForAlteration(from); + copy_tab.setName(to); + if (dict->alterTable(copy_tab) != 0) + ERR_RETURN(dict->getNdbError()); + + m_table= NULL; + + DBUG_RETURN(0); +} + + +/* + Delete a table from NDB Cluster + */ + +int ha_ndbcluster::delete_table(const char *name) +{ + DBUG_ENTER("delete_table"); + DBUG_PRINT("enter", ("name: %s", name)); + set_dbname(name); + set_tabname(name); + + if (check_ndb_connection()) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + DBUG_RETURN(drop_table()); +} + + +/* + Drop a table in NDB Cluster + */ + +int ha_ndbcluster::drop_table() +{ + NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); + + DBUG_ENTER("drop_table"); + DBUG_PRINT("enter", ("Deleting %s", m_tabname)); + + if (dict->dropTable(m_tabname)) + { + const NdbError err= dict->getNdbError(); + if (err.code == 709) + ; // 709: No such table existed + else + ERR_RETURN(dict->getNdbError()); + } + release_metadata(); + DBUG_RETURN(0); +} + + +/* + Drop a database in NDB Cluster + */ + +int ndbcluster_drop_database(const char *path) +{ + DBUG_ENTER("ndbcluster_drop_database"); + // TODO drop all tables for this database + DBUG_RETURN(1); +} + + +longlong ha_ndbcluster::get_auto_increment() +{ + // NOTE If number of values to be inserted is known + // the autoincrement cache could be used here + Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname); + return (longlong)auto_value; +} + + +/* + Constructor for the NDB Cluster table handler + */ + +ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): + handler(table_arg), + m_active_trans(NULL), + m_active_cursor(NULL), + m_ndb(NULL), + m_table(NULL), + m_table_flags(HA_REC_NOT_IN_SEQ | + HA_KEYPOS_TO_RNDPOS | + HA_NOT_EXACT_COUNT | + HA_NO_WRITE_DELAYED | + HA_NO_PREFIX_CHAR_KEYS | + HA_NO_BLOBS | + HA_DROP_BEFORE_CREATE | + HA_NOT_READ_AFTER_KEY), + m_use_write(false) +{ + + DBUG_ENTER("ha_ndbcluster"); + + m_tabname[0]= '\0'; + m_dbname[0]= '\0'; + + // TODO Adjust number of records and other parameters for proper + // selection of scan/pk access + records= 100; + block_size= 1024; + + DBUG_VOID_RETURN; +} + + +/* + Destructor for NDB Cluster table handler + */ + +ha_ndbcluster::~ha_ndbcluster() +{ + DBUG_ENTER("~ha_ndbcluster"); + + release_metadata(); + + // Check for open cursor/transaction + DBUG_ASSERT(m_active_cursor == NULL); + DBUG_ASSERT(m_active_trans == NULL); + + DBUG_VOID_RETURN; +} + + +/* + Open a table for further use + - fetch metadata for this table from NDB + - check that table exists +*/ + +int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) +{ + KEY *key; + DBUG_ENTER("open"); + DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d", + name, mode, test_if_locked)); + + // Setup ref_length to make room for the whole + // primary key to be written in the ref variable + + if (table->primary_key != MAX_KEY) + { + key= table->key_info+table->primary_key; + ref_length= key->key_length; + DBUG_PRINT("info", (" ref_length: %d", ref_length)); + } + // Init table lock structure + if (!(m_share=get_share(name))) + DBUG_RETURN(1); + thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0); + + set_dbname(name); + set_tabname(name); + + if (check_ndb_connection()) + DBUG_RETURN(HA_ERR_NO_CONNECTION); + + DBUG_RETURN(get_metadata(name)); +} + + +/* + Close the table + - release resources setup by open() + */ + +int ha_ndbcluster::close(void) +{ + DBUG_ENTER("close"); + free_share(m_share); + release_metadata(); + m_ndb= NULL; + DBUG_RETURN(0); +} + + +Ndb* ha_ndbcluster::seize_ndb() +{ + Ndb* ndb; + DBUG_ENTER("seize_ndb"); + +#ifdef USE_NDB_POOL + // Seize from pool + ndb= Ndb::seize(); +#else + ndb= new Ndb(""); +#endif + if (ndb->init(NDB_MAX_TRANSACTIONS) != 0) + { + ERR_PRINT(ndb->getNdbError()); + /* + TODO + Alt.1 If init fails because to many allocated Ndb + wait on condition for a Ndb object to be released. + Alt.2 Seize/release from pool, wait until next release + */ + delete ndb; + ndb= NULL; + } + DBUG_RETURN(ndb); +} + + +void ha_ndbcluster::release_ndb(Ndb* ndb) +{ + DBUG_ENTER("release_ndb"); +#ifdef USE_NDB_POOL + // Release to pool + Ndb::release(ndb); +#else + delete ndb; +#endif + DBUG_VOID_RETURN; +} + + +/* + If this thread already has a Ndb object allocated + in current THD, reuse it. Otherwise + seize a Ndb object, assign it to current THD and use it. + + Having a Ndb object also means that a connection to + NDB cluster has been opened. The connection is + checked. + +*/ + +int ha_ndbcluster::check_ndb_connection() +{ + THD* thd= current_thd; + Ndb* ndb; + DBUG_ENTER("check_ndb_connection"); + + if (!thd->transaction.ndb) + { + ndb= seize_ndb(); + if (!ndb) + DBUG_RETURN(2); + thd->transaction.ndb= ndb; + } + m_ndb= (Ndb*)thd->transaction.ndb; + m_ndb->setDatabaseName(m_dbname); + if (m_ndb->waitUntilReady() != 0) + { + DBUG_PRINT("error", ("Ndb was not ready")); + DBUG_RETURN(3); + } + DBUG_RETURN(0); +} + +void ndbcluster_close_connection(THD *thd) +{ + Ndb* ndb; + DBUG_ENTER("ndbcluster_close_connection"); + ndb= (Ndb*)thd->transaction.ndb; + ha_ndbcluster::release_ndb(ndb); + thd->transaction.ndb= NULL; + DBUG_VOID_RETURN; +} + + +/* + Try to discover one table from NDB + */ + +int ndbcluster_discover(const char *dbname, const char *name, + const void** frmblob, uint* frmlen) +{ + uint len; + const void* data; + const NDBTAB* tab; + DBUG_ENTER("ndbcluster_discover"); + DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name)); + + Ndb ndb(dbname); + if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0)) + ERR_RETURN(ndb.getNdbError()); + + if (!(tab= ndb.getDictionary()->getTable(name))) + { + DBUG_PRINT("info", ("Table %s not found", name)); + DBUG_RETURN(1); + } + + DBUG_PRINT("info", ("Found table %s", tab->getName())); + + len= tab->getFrmLength(); + if (len == 0 || tab->getFrmData() == NULL) + { + DBUG_PRINT("No frm data found", + ("Table is probably created via NdbApi")); + DBUG_RETURN(2); + } + + if (unpackfrm(&data, &len, tab->getFrmData())) + DBUG_RETURN(3); + + *frmlen= len; + *frmblob= data; + + DBUG_RETURN(0); +} + +static Ndb* g_ndb= NULL; + +#ifdef USE_DISCOVER_ON_STARTUP +/* + Dicover tables from NDB Cluster + - fetch a list of tables from NDB + - store the frm file for each table on disk + - if the table has an attached frm file + - if the database of the table exists +*/ + +int ndb_discover_tables() +{ + uint i; + NdbDictionary::Dictionary::List list; + NdbDictionary::Dictionary* dict; + char path[FN_REFLEN]; + DBUG_ENTER("ndb_discover_tables"); + + /* List tables in NDB Cluster kernel */ + dict= g_ndb->getDictionary(); + if (dict->listObjects(list, + NdbDictionary::Object::UserTable) != 0) + ERR_RETURN(g_ndb->getNdbError()); + + for (i= 0 ; i < list.count ; i++) + { + NdbDictionary::Dictionary::List::Element& t= list.elements[i]; + + DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name)); + if (create_table_from_handler(t.database, t.name, true)) + DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name)); + } + DBUG_RETURN(0); +} +#endif + + +/* + Initialise all gloal variables before creating + a NDB Cluster table handler + */ + +bool ndbcluster_init() +{ + DBUG_ENTER("ndbcluster_init"); + // Create a Ndb object to open the connection to NDB + g_ndb= new Ndb("sys"); + if (g_ndb->init() != 0) + { + ERR_PRINT (g_ndb->getNdbError()); + DBUG_RETURN(TRUE); + } + if (g_ndb->waitUntilReady() != 0) + { + ERR_PRINT (g_ndb->getNdbError()); + DBUG_RETURN(TRUE); + } + (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, + (hash_get_key) ndbcluster_get_key,0,0); + pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); + ndbcluster_inited= 1; +#ifdef USE_DISCOVER_ON_STARTUP + if (ndb_discover_tables() != 0) + DBUG_RETURN(TRUE); +#endif + DBUG_RETURN(false); +} + + +/* + End use of the NDB Cluster table handler + - free all global variables allocated by + ndcluster_init() +*/ + +bool ndbcluster_end() +{ + DBUG_ENTER("ndbcluster_end"); + delete g_ndb; + g_ndb= NULL; + if (!ndbcluster_inited) + DBUG_RETURN(0); + hash_free(&ndbcluster_open_tables); +#ifdef USE_NDB_POOL + ndb_pool_release(); +#endif + pthread_mutex_destroy(&ndbcluster_mutex); + ndbcluster_inited= 0; + DBUG_RETURN(0); +} + + +/* + Set m_tabname from full pathname to table file + */ + +void ha_ndbcluster::set_tabname(const char *path_name) +{ + char *end, *ptr; + + /* Scan name from the end */ + end= strend(path_name)-1; + ptr= end; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + uint name_len= end - ptr; + memcpy(m_tabname, ptr + 1, end - ptr); + m_tabname[name_len]= '\0'; +#ifdef __WIN__ + /* Put to lower case */ + ptr= m_tabname; + + while (*ptr != '\0') { + *ptr = tolower(*ptr); + ptr++; + } +#endif +} + +/** + * Set a given location from full pathname to table file + * + */ +void +ha_ndbcluster::set_tabname(const char *path_name, char * tabname) +{ + char *end, *ptr; + + /* Scan name from the end */ + end = strend(path_name)-1; + ptr = end; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + uint name_len = end - ptr; + memcpy(tabname, ptr + 1, end - ptr); + tabname[name_len] = '\0'; +#ifdef __WIN__ + /* Put to lower case */ + ptr = tabname; + + while (*ptr != '\0') { + *ptr= tolower(*ptr); + ptr++; + } +#endif +} + + +/* + Set m_dbname from full pathname to table file + + */ + +void ha_ndbcluster::set_dbname(const char *path_name) +{ + char *end, *ptr; + + /* Scan name from the end */ + ptr= strend(path_name)-1; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + ptr--; + end= ptr; + while (ptr >= path_name && *ptr != '\\' && *ptr != '/') { + ptr--; + } + uint name_len= end - ptr; + memcpy(m_dbname, ptr + 1, name_len); + m_dbname[name_len]= '\0'; +#ifdef __WIN__ + /* Put to lower case */ + + ptr= m_dbname; + + while (*ptr != '\0') { + *ptr= tolower(*ptr); + ptr++; + } +#endif +} + + +ha_rows +ha_ndbcluster::records_in_range(int inx, + const byte *start_key,uint start_key_len, + enum ha_rkey_function start_search_flag, + const byte *end_key,uint end_key_len, + enum ha_rkey_function end_search_flag) +{ + ha_rows records= 10; + KEY* key_info= table->key_info + inx; + uint key_length= key_info->key_length; + + DBUG_ENTER("records_in_range"); + DBUG_PRINT("enter", ("inx: %d", inx)); + DBUG_PRINT("enter", ("start_key: %x, start_key_len: %d", start_key, start_key_len)); + DBUG_PRINT("enter", ("start_search_flag: %d", start_search_flag)); + DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len)); + DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag)); + + /* + Check that start_key_len is equal to + the length of the used index and + prevent partial scan/read of hash indexes by returning HA_POS_ERROR + */ + NDB_INDEX_TYPE idx_type= get_index_type(inx); + if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) && + start_key_len < key_length) + { + DBUG_PRINT("warning", ("Tried to use index which required" + "full key length: %d, HA_POS_ERROR", + key_length)); + records= HA_POS_ERROR; + } + DBUG_RETURN(records); +} + + +/* + Handling the shared NDB_SHARE structure that is needed to + provide table locking. + It's also used for sharing data with other NDB handlers + in the same MySQL Server. There is currently not much + data we want to or can share. + */ + +static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length, + my_bool not_used __attribute__((unused))) +{ + *length=share->table_name_length; + return (byte*) share->table_name; +} + +static NDB_SHARE* get_share(const char *table_name) +{ + NDB_SHARE *share; + pthread_mutex_lock(&ndbcluster_mutex); + uint length=(uint) strlen(table_name); + if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) table_name, + length))) + { + if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1, + MYF(MY_WME | MY_ZEROFILL)))) + { + share->table_name_length=length; + share->table_name=(char*) (share+1); + strmov(share->table_name,table_name); + if (my_hash_insert(&ndbcluster_open_tables, (byte*) share)) + { + pthread_mutex_unlock(&ndbcluster_mutex); + my_free((gptr) share,0); + return 0; + } + thr_lock_init(&share->lock); + pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); + } + } + share->use_count++; + pthread_mutex_unlock(&ndbcluster_mutex); + return share; +} + + +static void free_share(NDB_SHARE *share) +{ + pthread_mutex_lock(&ndbcluster_mutex); + if (!--share->use_count) + { + hash_delete(&ndbcluster_open_tables, (byte*) share); + thr_lock_delete(&share->lock); + pthread_mutex_destroy(&share->mutex); + my_free((gptr) share, MYF(0)); + } + pthread_mutex_unlock(&ndbcluster_mutex); +} + + + +/* + Internal representation of the frm blob + +*/ + +struct frm_blob_struct +{ + struct frm_blob_header + { + uint ver; // Version of header + uint orglen; // Original length of compressed data + uint complen; // Compressed length of data, 0=uncompressed + } head; + char data[1]; +}; + + + +static int packfrm(const void *data, uint len, + const void **pack_data, uint *pack_len) +{ + int error; + ulong org_len, comp_len; + uint blob_len; + frm_blob_struct* blob; + DBUG_ENTER("packfrm"); + DBUG_PRINT("enter", ("data: %x, len: %d", data, len)); + + error= 1; + org_len = len; + if (my_compress((byte*)data, &org_len, &comp_len)) + goto err; + + DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len)); + DBUG_DUMP("compressed", (char*)data, org_len); + + error= 2; + blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len; + if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME)))) + goto err; + + // Store compressed blob in machine independent format + int4store((char*)(&blob->head.ver), 1); + int4store((char*)(&blob->head.orglen), comp_len); + int4store((char*)(&blob->head.complen), org_len); + + // Copy frm data into blob, already in machine independent format + memcpy(blob->data, data, org_len); + + *pack_data = blob; + *pack_len = blob_len; + error = 0; + + DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len)); +err: + DBUG_RETURN(error); + +} + + +static int unpackfrm(const void **unpack_data, uint *unpack_len, + const void *pack_data) +{ + const frm_blob_struct *blob = (frm_blob_struct*)pack_data; + byte *data; + ulong complen, orglen, ver; + DBUG_ENTER("unpackfrm"); + DBUG_PRINT("enter", ("pack_data: %x", pack_data)); + + complen= uint4korr((char*)&blob->head.complen); + orglen= uint4korr((char*)&blob->head.orglen); + ver= uint4korr((char*)&blob->head.ver); + + DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d", + ver,complen,orglen)); + DBUG_DUMP("blob->data", (char*) blob->data, complen); + + if (ver != 1) + DBUG_RETURN(1); + if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME)))) + DBUG_RETURN(2); + memcpy(data, blob->data, complen); + + if (my_uncompress(data, &complen, &orglen)) + { + my_free((char*)data, MYF(0)); + DBUG_RETURN(3); + } + + *unpack_data = data; + *unpack_len = complen; + + DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len)); + + DBUG_RETURN(0); +} +#endif /* HAVE_NDBCLUSTER_DB */ diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h new file mode 100644 index 00000000000..ed66d07d79b --- /dev/null +++ b/sql/ha_ndbcluster.h @@ -0,0 +1,218 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/* + This file defines the NDB Cluster handler: the interface between MySQL and + NDB Cluster +*/ + +/* The class defining a handle to an NDB Cluster table */ + +#ifdef __GNUC__ +#pragma interface /* gcc class implementation */ +#endif + +#include <ndbapi_limits.h> +#include <ndb_types.h> + +class Ndb; // Forward declaration +class NdbOperation; // Forward declaration +class NdbConnection; // Forward declaration +class NdbRecAttr; // Forward declaration +class NdbResultSet; // Forward declaration + +typedef enum ndb_index_type { + UNDEFINED_INDEX = 0, + PRIMARY_KEY_INDEX = 1, + UNIQUE_INDEX = 2, + ORDERED_INDEX = 3 +} NDB_INDEX_TYPE; + + +typedef struct st_ndbcluster_share { + THR_LOCK lock; + pthread_mutex_t mutex; + char *table_name; + uint table_name_length,use_count; +} NDB_SHARE; + +class ha_ndbcluster: public handler +{ + public: + ha_ndbcluster(TABLE *table); + ~ha_ndbcluster(); + + int open(const char *name, int mode, uint test_if_locked); + int close(void); + + int write_row(byte *buf); + int update_row(const byte *old_data, byte *new_data); + int delete_row(const byte *buf); + int index_init(uint index); + int index_end(); + int index_read(byte *buf, const byte *key, uint key_len, + enum ha_rkey_function find_flag); + int index_read_idx(byte *buf, uint index, const byte *key, uint key_len, + enum ha_rkey_function find_flag); + int index_next(byte *buf); + int index_prev(byte *buf); + int index_first(byte *buf); + int index_last(byte *buf); + int rnd_init(bool scan=1); + int rnd_end(); + int rnd_next(byte *buf); + int rnd_pos(byte *buf, byte *pos); + void position(const byte *record); + + void info(uint); + int extra(enum ha_extra_function operation); + int extra_opt(enum ha_extra_function operation, ulong cache_size); + int reset(); + int external_lock(THD *thd, int lock_type); + int start_stmt(THD *thd); + const char * table_type() const { return("ndbcluster");} + const char ** bas_ext() const; + ulong table_flags(void) const { return m_table_flags; } + ulong index_flags(uint idx) const; + uint max_record_length() const { return NDB_MAX_TUPLE_SIZE; }; + uint max_keys() const { return MAX_KEY; } + uint max_key_parts() const { return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY; }; + uint max_key_length() const { return NDB_MAX_KEY_SIZE;}; + + int rename_table(const char *from, const char *to); + int delete_table(const char *name); + int create(const char *name, TABLE *form, HA_CREATE_INFO *info); + THR_LOCK_DATA **store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type); + + bool low_byte_first() const + { +#ifdef WORDS_BIGENDIAN + return false; +#else + return true; +#endif + } + bool has_transactions() { return true;} + + const char* index_type(uint key_number) { + switch (get_index_type(key_number)) { + case ORDERED_INDEX: + return "BTREE"; + case UNIQUE_INDEX: + case PRIMARY_KEY_INDEX: + default: + return "HASH"; + } + } + + double scan_time(); + ha_rows records_in_range(int inx, + const byte *start_key,uint start_key_len, + enum ha_rkey_function start_search_flag, + const byte *end_key,uint end_key_len, + enum ha_rkey_function end_search_flag); + + + static Ndb* seize_ndb(); + static void release_ndb(Ndb* ndb); + + + private: + int alter_table_name(const char *from, const char *to); + int drop_table(); + int create_index(const char *name, KEY *key_info); + int initialize_autoincrement(const void* table); + int get_metadata(const char* path); + void release_metadata(); + const char* get_index_name(uint idx_no) const; + NDB_INDEX_TYPE get_index_type(uint idx_no) const; + NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; + + int pk_read(const byte *key, uint key_len, + byte *buf); + int unique_index_read(const byte *key, uint key_len, + byte *buf); + int ordered_index_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag); + int full_table_scan(byte * buf); + int next_result(byte *buf); +#if 0 + int filtered_scan(const byte *key, uint key_len, + byte *buf, + enum ha_rkey_function find_flag); +#endif + + void unpack_record(byte *buf); + + void set_dbname(const char *pathname); + void set_tabname(const char *pathname); + void set_tabname(const char *pathname, char *tabname); + + bool set_hidden_key(NdbOperation*, + uint fieldnr, const byte* field_ptr); + int set_ndb_key(NdbOperation*, Field *field, + uint fieldnr, const byte* field_ptr); + int set_ndb_value(NdbOperation*, Field *field, uint fieldnr); + int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr); + int set_primary_key(NdbOperation *op, const byte *key); + int set_primary_key(NdbOperation *op); + int key_cmp(uint keynr, const byte * old_row, const byte * new_row); + void print_results(); + + longlong get_auto_increment(); + + int ndb_err(NdbConnection*); + + private: + int check_ndb_connection(); + + NdbConnection *m_active_trans; + NdbResultSet *m_active_cursor; + Ndb *m_ndb; + void *m_table; + char m_dbname[FN_HEADLEN]; + //char m_schemaname[FN_HEADLEN]; + char m_tabname[FN_HEADLEN]; + ulong m_table_flags; + THR_LOCK_DATA m_lock; + NDB_SHARE *m_share; + NDB_INDEX_TYPE m_indextype[MAX_KEY]; + NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; + bool m_use_write; +}; + +bool ndbcluster_init(void); +bool ndbcluster_end(void); + +int ndbcluster_commit(THD *thd, void* ndb_transaction); +int ndbcluster_rollback(THD *thd, void* ndb_transaction); + +void ndbcluster_close_connection(THD *thd); + +int ndbcluster_discover(const char* dbname, const char* name, + const void** frmblob, uint* frmlen); +int ndbcluster_drop_database(const char* path); + + + + + + + + diff --git a/sql/handler.cc b/sql/handler.cc index 97abc11abe3..670a2b401be 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -40,6 +40,9 @@ #else #define innobase_query_caching_of_table_permitted(X,Y,Z) 1 #endif +#ifdef HAVE_NDBCLUSTER_DB +#include "ha_ndbcluster.h" +#endif #include <myisampack.h> #include <errno.h> @@ -51,7 +54,7 @@ ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count, ha_read_key_count, ha_read_next_count, ha_read_prev_count, ha_read_first_count, ha_read_last_count, ha_commit_count, ha_rollback_count, - ha_read_rnd_count, ha_read_rnd_next_count; + ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count; static SHOW_COMP_OPTION have_yes= SHOW_OPTION_YES; @@ -79,6 +82,10 @@ struct show_table_type_st sys_table_types[]= "Supports transactions and page-level locking", DB_TYPE_BERKELEY_DB}, {"BERKELEYDB",&have_berkeley_db, "Alias for BDB", DB_TYPE_BERKELEY_DB}, + {"NDBCLUSTER", &have_ndbcluster, + "Clustered, fault tolerant memory based tables", DB_TYPE_NDBCLUSTER}, + {"NDB", &have_ndbcluster, + "Alias for NDBCLUSTER", DB_TYPE_NDBCLUSTER}, {"EXAMPLE",&have_example_db, "Example storage engine", DB_TYPE_EXAMPLE_DB}, {NullS, NULL, NullS, DB_TYPE_UNKNOWN} @@ -181,6 +188,10 @@ handler *get_new_handler(TABLE *table, enum db_type db_type) case DB_TYPE_EXAMPLE_DB: return new ha_example(table); #endif +#ifdef HAVE_NDBCLUSTER_DB + case DB_TYPE_NDBCLUSTER: + return new ha_ndbcluster(table); +#endif case DB_TYPE_HEAP: return new ha_heap(table); default: // should never happen @@ -225,6 +236,18 @@ int ha_init() opt_using_transactions=1; } #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + { + if (ndbcluster_init()) + { + have_ndbcluster= SHOW_OPTION_DISABLED; + error= 1; + } + else + opt_using_transactions=1; + } +#endif return error; } @@ -252,6 +275,10 @@ int ha_panic(enum ha_panic_function flag) if (have_innodb == SHOW_OPTION_YES) error|=innobase_end(); #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + error|=ndbcluster_end(); +#endif return error; } /* ha_panic */ @@ -261,6 +288,10 @@ void ha_drop_database(char* path) if (have_innodb == SHOW_OPTION_YES) innobase_drop_database(path); #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + ndbcluster_drop_database(path); +#endif } void ha_close_connection(THD* thd) @@ -269,6 +300,10 @@ void ha_close_connection(THD* thd) if (have_innodb == SHOW_OPTION_YES) innobase_close_connection(thd); #endif +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + ndbcluster_close_connection(thd); +#endif } /* @@ -428,6 +463,19 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans) WRITE_CACHE, (my_off_t) 0, 0, 1); thd->transaction.trans_log.end_of_file= max_binlog_cache_size; } +#ifdef HAVE_NDBCLUSTER_DB + if (trans->ndb_tid) + { + if ((error=ndbcluster_commit(thd,trans->ndb_tid))) + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), error); + error=1; + } + if (trans == &thd->transaction.all) + operation_done= transaction_commited= 1; + trans->ndb_tid=0; + } +#endif #ifdef HAVE_BERKELEY_DB if (trans->bdb_tid) { @@ -481,6 +529,18 @@ int ha_rollback_trans(THD *thd, THD_TRANS *trans) if (opt_using_transactions) { bool operation_done=0; +#ifdef HAVE_NDBCLUSTER_DB + if (trans->ndb_tid) + { + if ((error=ndbcluster_rollback(thd, trans->ndb_tid))) + { + my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), error); + error=1; + } + trans->ndb_tid = 0; + operation_done=1; + } +#endif #ifdef HAVE_BERKELEY_DB if (trans->bdb_tid) { @@ -1160,8 +1220,10 @@ bool handler::caching_allowed(THD* thd, char* table_key, ** Some general functions that isn't in the handler class ****************************************************************************/ - /* Initiates table-file and calls apropriate database-creator */ - /* Returns 1 if something got wrong */ +/* + Initiates table-file and calls apropriate database-creator + Returns 1 if something got wrong +*/ int ha_create_table(const char *name, HA_CREATE_INFO *create_info, bool update_create_info) @@ -1177,7 +1239,7 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info, { update_create_info_from_table(create_info, &table); if (table.file->table_flags() & HA_DROP_BEFORE_CREATE) - table.file->delete_table(name); // Needed for BDB tables + table.file->delete_table(name); } if (lower_case_table_names == 2 && !(table.file->table_flags() & HA_FILE_BASED)) @@ -1299,6 +1361,26 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, /* + Try to discover one table from handler(s) +*/ + +int ha_discover(const char* dbname, const char* name, + const void** frmblob, uint* frmlen) +{ + int error= 1; // Table does not exist in any handler + DBUG_ENTER("ha_discover"); + DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name)); +#ifdef HAVE_NDBCLUSTER_DB + if (have_ndbcluster == SHOW_OPTION_YES) + error= ndbcluster_discover(dbname, name, frmblob, frmlen); +#endif + if (!error) + statistic_increment(ha_discover_count,&LOCK_status); + DBUG_RETURN(error); +} + + +/* Read first row between two ranges. Store ranges for future calls to read_range_next @@ -1434,3 +1516,5 @@ int handler::compare_key(key_range *range) } return key_compare_result_on_equal; } + + diff --git a/sql/handler.h b/sql/handler.h index 9d39eff1301..62ff74c436a 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -28,7 +28,8 @@ #define NO_HASH /* Not yet implemented */ #endif -#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB) +#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB) || \ + defined(HAVE_NDBCLUSTER_DB) #define USING_TRANSACTIONS #endif @@ -80,7 +81,6 @@ #define HA_FILE_BASED (1 << 26) - /* bits in index_flags(index_number) for what you can do with index */ #define HA_WRONG_ASCII_ORDER 1 /* Can't use sorting through key */ #define HA_READ_NEXT 2 /* Read next record with same key */ @@ -141,12 +141,18 @@ #define HA_CACHE_TBL_ASKTRANSACT 1 #define HA_CACHE_TBL_TRANSACT 2 -enum db_type { DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1, - DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM, - DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM, - DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM, - DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB, DB_TYPE_GEMINI, - DB_TYPE_EXAMPLE_DB, DB_TYPE_DEFAULT }; +enum db_type +{ + DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1, + DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM, + DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM, + DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM, + DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB, + DB_TYPE_GEMINI, DB_TYPE_NDBCLUSTER, + DB_TYPE_EXAMPLE_DB, + + DB_TYPE_DEFAULT // Must be last +}; struct show_table_type_st { const char *type; @@ -176,6 +182,7 @@ typedef struct st_thd_trans { void *bdb_tid; void *innobase_tid; bool innodb_active_trans; + void *ndb_tid; } THD_TRANS; enum enum_tx_isolation { ISO_READ_UNCOMMITTED, ISO_READ_COMMITTED, @@ -479,3 +486,5 @@ bool ha_flush_logs(void); int ha_recovery_logging(THD *thd, bool on); int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache); +int ha_discover(const char* dbname, const char* name, + const void** frmblob, uint* frmlen); diff --git a/sql/item_subselect.cc b/sql/item_subselect.cc index 196b54141d1..2d10be62d53 100644 --- a/sql/item_subselect.cc +++ b/sql/item_subselect.cc @@ -1356,7 +1356,7 @@ void subselect_uniquesubquery_engine::exclude() table_map subselect_engine::calc_const_tables(TABLE_LIST *table) { table_map map= 0; - for(; table; table= table->next) + for (; table; table= table->next) { TABLE *tbl= table->table; if (tbl && tbl->const_table) diff --git a/sql/item_sum.h b/sql/item_sum.h index 107c19b7d85..9593c8ddbba 100644 --- a/sql/item_sum.h +++ b/sql/item_sum.h @@ -298,6 +298,7 @@ class Item_sum_avg :public Item_sum_num void update_field(); Item *result_item(Field *field) { return new Item_avg_field(this); } + void no_rows_in_result() {} const char *func_name() const { return "avg"; } Item *copy_or_same(THD* thd); }; diff --git a/sql/lex.h b/sql/lex.h index 3274d544744..64cc6b57464 100644 --- a/sql/lex.h +++ b/sql/lex.h @@ -48,7 +48,7 @@ SYM_GROUP sym_group_rtree= {"RTree keys", "HAVE_RTREE_KEYS"}; */ static SYMBOL symbols[] = { - { "&&", SYM(AND)}, + { "&&", SYM(AND_SYM)}, { "<", SYM(LT)}, { "<=", SYM(LE)}, { "<>", SYM(NE)}, @@ -67,7 +67,7 @@ static SYMBOL symbols[] = { { "ALL", SYM(ALL)}, { "ALTER", SYM(ALTER)}, { "ANALYZE", SYM(ANALYZE_SYM)}, - { "AND", SYM(AND)}, + { "AND", SYM(AND_SYM)}, { "ANY", SYM(ANY_SYM)}, { "AS", SYM(AS)}, { "ASC", SYM(ASC)}, @@ -296,6 +296,8 @@ static SYMBOL symbols[] = { { "NAMES", SYM(NAMES_SYM)}, { "NATIONAL", SYM(NATIONAL_SYM)}, { "NATURAL", SYM(NATURAL)}, + { "NDB", SYM(NDBCLUSTER_SYM)}, + { "NDBCLUSTER", SYM(NDBCLUSTER_SYM)}, { "NCHAR", SYM(NCHAR_SYM)}, { "NEW", SYM(NEW_SYM)}, { "NEXT", SYM(NEXT_SYM)}, @@ -313,7 +315,7 @@ static SYMBOL symbols[] = { { "OPTIMIZE", SYM(OPTIMIZE)}, { "OPTION", SYM(OPTION)}, { "OPTIONALLY", SYM(OPTIONALLY)}, - { "OR", SYM(OR)}, + { "OR", SYM(OR_SYM)}, { "ORDER", SYM(ORDER_SYM)}, { "OUTER", SYM(OUTER)}, { "OUTFILE", SYM(OUTFILE)}, diff --git a/sql/log_event.cc b/sql/log_event.cc index 3b92e0956ba..207f4a51ff4 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -923,15 +923,15 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) (ulong) thread_id, (ulong) exec_time, error_code); } - bool same_db = 0; + bool different_db= 1; if (db && last_db) { - if (!(same_db = !memcmp(last_db, db, db_len + 1))) + if (different_db= memcmp(last_db, db, db_len + 1)) memcpy(last_db, db, db_len + 1); } - if (db && db[0] && !same_db) + if (db && db[0] && different_db) fprintf(file, "use %s;\n", db); end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); *end++=';'; @@ -960,8 +960,10 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) position to store is of the END of the current log event. */ #if MYSQL_VERSION_ID < 50000 - rli->future_group_master_log_pos= log_pos + get_event_len(); + rli->future_group_master_log_pos= log_pos + get_event_len() - + (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); #else + /* In 5.0 we store the end_log_pos in the relay log so no problem */ rli->future_group_master_log_pos= log_pos; #endif clear_all_errors(thd, rli); @@ -1165,6 +1167,11 @@ int Start_log_event::exec_event(struct st_relay_log_info* rli) { DBUG_ENTER("Start_log_event::exec_event"); + /* + If the I/O thread has not started, mi->old_format is BINLOG_FORMAT_CURRENT + (that's what the MASTER_INFO constructor does), so the test below is not + perfect at all. + */ switch (rli->mi->old_format) { case BINLOG_FORMAT_CURRENT: /* @@ -1542,14 +1549,21 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db, thread_id, exec_time); } - bool same_db = 0; + bool different_db= 1; if (db && last_db) { - if (!(same_db = !memcmp(last_db, db, db_len + 1))) + /* + If the database is different from the one of the previous statement, we + need to print the "use" command, and we update the last_db. + But if commented, the "use" is going to be commented so we should not + update the last_db. + */ + if ((different_db= memcmp(last_db, db, db_len + 1)) && + !commented) memcpy(last_db, db, db_len + 1); } - if (db && db[0] && !same_db) + if (db && db[0] && different_db) fprintf(file, "%suse %s;\n", commented ? "# " : "", db); @@ -1667,7 +1681,8 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, if (!use_rli_only_for_errors) { #if MYSQL_VERSION_ID < 50000 - rli->future_group_master_log_pos= log_pos + get_event_len(); + rli->future_group_master_log_pos= log_pos + get_event_len() - + (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); #else rli->future_group_master_log_pos= log_pos; #endif @@ -3138,9 +3153,11 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) */ #if MYSQL_VERSION_ID < 40100 - rli->future_master_log_pos= log_pos + get_event_len(); + rli->future_master_log_pos= log_pos + get_event_len() - + (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); #elif MYSQL_VERSION_ID < 50000 - rli->future_group_master_log_pos= log_pos + get_event_len(); + rli->future_group_master_log_pos= log_pos + get_event_len() - + (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); #else rli->future_group_master_log_pos= log_pos; #endif diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 00af3c67c7e..4381ea1f93e 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -839,7 +839,7 @@ extern ulong server_id, concurrency; extern ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count; extern ulong ha_read_key_count, ha_read_next_count, ha_read_prev_count; extern ulong ha_read_first_count, ha_read_last_count; -extern ulong ha_read_rnd_count, ha_read_rnd_next_count; +extern ulong ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count; extern ulong ha_commit_count, ha_rollback_count,table_cache_size; extern ulong max_connections,max_connect_errors, connect_timeout; extern ulong slave_net_timeout; @@ -893,6 +893,7 @@ extern SHOW_VAR init_vars[],status_vars[], internal_vars[]; extern SHOW_COMP_OPTION have_isam; extern SHOW_COMP_OPTION have_innodb; extern SHOW_COMP_OPTION have_berkeley_db; +extern SHOW_COMP_OPTION have_ndbcluster; extern struct system_variables global_system_variables; extern struct system_variables max_system_variables; extern struct rand_struct sql_rand; @@ -962,6 +963,10 @@ int format_number(uint inputflag,uint max_length,my_string pos,uint length, my_string *errpos); int openfrm(const char *name,const char *alias,uint filestat,uint prgflag, uint ha_open_flags, TABLE *outparam); +int readfrm(const char *name, const void** data, uint* length); +int writefrm(const char* name, const void* data, uint len); +int create_table_from_handler(const char *db, const char *name, + bool create_if_found); int closefrm(TABLE *table); db_type get_table_type(const char *name); int read_string(File file, gptr *to, uint length); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 2468deb2cc7..2878654b759 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -32,6 +32,9 @@ #ifdef HAVE_ISAM #include "ha_isam.h" #endif +#ifdef HAVE_NDBCLUSTER_DB +#include "ha_ndbcluster.h" +#endif #include <nisam.h> #include <thr_alarm.h> #include <ft_global.h> @@ -261,7 +264,7 @@ my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol; my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_log_slave_updates= 0; -my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam; +my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster; my_bool opt_readonly, use_temp_pool, relay_log_purge; my_bool opt_sync_bdb_logs, opt_sync_frm; my_bool opt_secure_auth= 0; @@ -370,7 +373,8 @@ KEY_CACHE *sql_key_cache; CHARSET_INFO *system_charset_info, *files_charset_info ; CHARSET_INFO *national_charset_info, *table_alias_charset; -SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam, have_example_db; +SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam, + have_ndbcluster, have_example_db; SHOW_COMP_OPTION have_raid, have_openssl, have_symlink, have_query_cache; SHOW_COMP_OPTION have_crypt, have_compress; @@ -3625,7 +3629,7 @@ enum options_mysqld OPT_INNODB_FAST_SHUTDOWN, OPT_INNODB_FILE_PER_TABLE, OPT_SAFE_SHOW_DB, - OPT_INNODB, OPT_ISAM, OPT_SKIP_SAFEMALLOC, + OPT_INNODB, OPT_ISAM, OPT_NDBCLUSTER, OPT_SKIP_SAFEMALLOC, OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, @@ -4158,6 +4162,10 @@ Disable with --skip-innodb (will save memory).", Disable with --skip-isam.", (gptr*) &opt_isam, (gptr*) &opt_isam, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0}, + {"ndbcluster", OPT_NDBCLUSTER, "Enable NDB Cluster (if this version of MySQL supports it). \ +Disable with --skip-ndbcluster (will save memory).", + (gptr*) &opt_ndbcluster, (gptr*) &opt_ndbcluster, 0, GET_BOOL, NO_ARG, 1, 0, 0, + 0, 0, 0}, {"skip-locking", OPT_SKIP_LOCK, "Deprecated option, use --skip-external-locking instead.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -4831,6 +4839,7 @@ struct show_var_st status_vars[]= { {"Handler_rollback", (char*) &ha_rollback_count, SHOW_LONG}, {"Handler_update", (char*) &ha_update_count, SHOW_LONG}, {"Handler_write", (char*) &ha_write_count, SHOW_LONG}, + {"Handler_discover", (char*) &ha_discover_count, SHOW_LONG}, {"Key_blocks_not_flushed", (char*) &dflt_key_cache_var.global_blocks_changed, SHOW_KEY_CACHE_LONG}, {"Key_blocks_used", (char*) &dflt_key_cache_var.global_blocks_used, @@ -5129,6 +5138,11 @@ static void mysql_init_variables(void) #else have_example_db= SHOW_OPTION_NO; #endif +#ifdef HAVE_NDBCLUSTER_DB + have_ndbcluster=SHOW_OPTION_DISABLED; +#else + have_ndbcluster=SHOW_OPTION_NO; +#endif #ifdef USE_RAID have_raid=SHOW_OPTION_YES; #else @@ -5599,6 +5613,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), have_isam= SHOW_OPTION_DISABLED; #endif break; + case OPT_NDBCLUSTER: +#ifdef HAVE_NDBCLUSTER_DB + if (opt_ndbcluster) + have_ndbcluster=SHOW_OPTION_YES; + else + have_ndbcluster=SHOW_OPTION_DISABLED; +#endif + break; case OPT_INNODB: #ifdef HAVE_INNOBASE_DB if (opt_innodb) diff --git a/sql/set_var.cc b/sql/set_var.cc index 5ccda5c7052..1339bdc887f 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -59,6 +59,9 @@ #ifdef HAVE_INNOBASE_DB #include "ha_innodb.h" #endif +#ifdef HAVE_NDBCLUSTER_DB +#include "ha_ndbcluster.h" +#endif static HASH system_variable_hash; const char *bool_type_names[]= { "OFF", "ON", NullS }; @@ -638,6 +641,7 @@ struct show_var_st init_vars[]= { {"have_crypt", (char*) &have_crypt, SHOW_HAVE}, {"have_innodb", (char*) &have_innodb, SHOW_HAVE}, {"have_isam", (char*) &have_isam, SHOW_HAVE}, + {"have_ndbcluster", (char*) &have_ndbcluster, SHOW_HAVE}, {"have_openssl", (char*) &have_openssl, SHOW_HAVE}, {"have_query_cache", (char*) &have_query_cache, SHOW_HAVE}, {"have_raid", (char*) &have_raid, SHOW_HAVE}, @@ -1090,9 +1094,9 @@ static void fix_max_relay_log_size(THD *thd, enum_var_type type) static int check_max_delayed_threads(THD *thd, set_var *var) { - int val= var->value->val_int(); + longlong val= var->value->val_int(); if (var->type != OPT_GLOBAL && val != 0 && - val != global_system_variables.max_insert_delayed_threads) + val != (longlong) global_system_variables.max_insert_delayed_threads) { char buf[64]; my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name, llstr(val, buf)); @@ -1101,6 +1105,7 @@ static int check_max_delayed_threads(THD *thd, set_var *var) return 0; } + static void fix_max_connections(THD *thd, enum_var_type type) { resize_thr_alarm(max_connections + diff --git a/sql/slave.cc b/sql/slave.cc index 67b9c7515e1..061c737b538 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -361,6 +361,55 @@ void init_slave_skip_errors(const char* arg) } } + +void st_relay_log_info::inc_group_relay_log_pos(ulonglong val, + ulonglong log_pos, + bool skip_lock) +{ + if (!skip_lock) + pthread_mutex_lock(&data_lock); + inc_event_relay_log_pos(val); + group_relay_log_pos= event_relay_log_pos; + strmake(group_relay_log_name,event_relay_log_name, + sizeof(group_relay_log_name)-1); + + notify_group_relay_log_name_update(); + + /* + If the slave does not support transactions and replicates a transaction, + users should not trust group_master_log_pos (which they can display with + SHOW SLAVE STATUS or read from relay-log.info), because to compute + group_master_log_pos the slave relies on log_pos stored in the master's + binlog, but if we are in a master's transaction these positions are always + the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does + not advance as it should on the non-transactional slave (it advances by + big leaps, whereas it should advance by small leaps). + */ + if (log_pos) // 3.23 binlogs don't have log_posx + { +#if MYSQL_VERSION_ID < 50000 + /* + If the event was converted from a 3.23 format, get_event_len() has + grown by 6 bytes (at least for most events, except LOAD DATA INFILE + which is already a big problem for 3.23->4.0 replication); 6 bytes is + the difference between the header's size in 4.0 (LOG_EVENT_HEADER_LEN) + and the header's size in 3.23 (OLD_HEADER_LEN). Note that using + mi->old_format will not help if the I/O thread has not started yet. + Yes this is a hack but it's just to make 3.23->4.x replication work; + 3.23->5.0 replication is working much better. + */ + group_master_log_pos= log_pos + val - + (mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); +#else + group_master_log_pos= log_pos+ val; +#endif /* MYSQL_VERSION_ID < 5000 */ + } + pthread_cond_broadcast(&data_cond); + if (!skip_lock) + pthread_mutex_unlock(&data_lock); +} + + void st_relay_log_info::close_temporary_tables() { TABLE *table,*next; @@ -3473,8 +3522,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, DBUG_RETURN(1); } memcpy(tmp_buf,buf,event_len); - tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer + /* + Create_file constructor wants a 0 as last char of buffer, this 0 will + serve as the string-termination char for the file's name (which is at the + end of the buffer) + We must increment event_len, otherwise the event constructor will not see + this end 0, which leads to segfault. + */ + tmp_buf[event_len++]=0; buf = (const char*)tmp_buf; + int4store(buf+EVENT_LEN_OFFSET, event_len); } /* This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to @@ -3520,7 +3577,11 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, DBUG_ASSERT(tmp_buf); int error = process_io_create_file(mi,(Create_file_log_event*)ev); delete ev; - mi->master_log_pos += event_len; + /* + We had incremented event_len, but now when it is used to calculate the + position in the master's log, we must use the original value. + */ + mi->master_log_pos += --event_len; DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); pthread_mutex_unlock(&mi->data_lock); my_free((char*)tmp_buf, MYF(0)); diff --git a/sql/slave.h b/sql/slave.h index 549ceb5d42c..c925831d1a5 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -291,34 +291,7 @@ typedef struct st_relay_log_info event_relay_log_pos+= val; } - void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0) - { - if (!skip_lock) - pthread_mutex_lock(&data_lock); - inc_event_relay_log_pos(val); - group_relay_log_pos= event_relay_log_pos; - strmake(group_relay_log_name,event_relay_log_name, - sizeof(group_relay_log_name)-1); - - notify_group_relay_log_name_update(); - - /* - If the slave does not support transactions and replicates a transaction, - users should not trust group_master_log_pos (which they can display with - SHOW SLAVE STATUS or read from relay-log.info), because to compute - group_master_log_pos the slave relies on log_pos stored in the master's - binlog, but if we are in a master's transaction these positions are always - the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does - not advance as it should on the non-transactional slave (it advances by - big leaps, whereas it should advance by small leaps). - */ - if (log_pos) // 3.23 binlogs don't have log_posx - group_master_log_pos= log_pos+ val; - pthread_cond_broadcast(&data_cond); - if (!skip_lock) - pthread_mutex_unlock(&data_lock); - } - + void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0); int wait_for_pos(THD* thd, String* log_name, longlong log_pos, longlong timeout); void close_temporary_tables(); diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 8c4571dd540..d24b13ff96f 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -1317,18 +1317,34 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db, { char path[FN_REFLEN]; int error; + uint discover_retry_count= 0; DBUG_ENTER("open_unireg_entry"); strxmov(path, mysql_data_home, "/", db, "/", name, NullS); - if (openfrm(path,alias, + while (openfrm(path,alias, (uint) (HA_OPEN_KEYFILE | HA_OPEN_RNDFILE | HA_GET_INDEX | HA_TRY_READ_ONLY), READ_KEYINFO | COMPUTE_TYPES | EXTRA_RECORD, thd->open_options, entry)) { if (!entry->crashed) - goto err; // Can't repair the table + { + /* + Frm file could not be found on disk + Since it does not exist, no one can be using it + LOCK_open has been locked to protect from someone else + trying to discover the table at the same time. + */ + if (discover_retry_count++ != 0) + goto err; + if (create_table_from_handler(db, name, true) != 0) + goto err; + + thd->clear_error(); // Clear error message + continue; + } + // Code below is for repairing a crashed file TABLE_LIST table_list; bzero((char*) &table_list, sizeof(table_list)); // just for safe table_list.db=(char*) db; @@ -1374,6 +1390,7 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db, if (error) goto err; + break; } /* If we are here, there was no fatal error (but error may be still diff --git a/sql/sql_class.h b/sql/sql_class.h index 8ccfe3cddd5..97baf47a8cb 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -703,7 +703,10 @@ public: THD_TRANS all; // Trans since BEGIN WORK THD_TRANS stmt; // Trans for current statement uint bdb_lock_count; - + uint ndb_lock_count; +#ifdef HAVE_NDBCLUSTER_DB + void* ndb; +#endif /* Tables changed in transaction (that must be invalidated in query cache). List contain only transactional tables, that not invalidated in query @@ -893,7 +896,8 @@ public: { #ifdef USING_TRANSACTIONS return (transaction.all.bdb_tid != 0 || - transaction.all.innodb_active_trans != 0); + transaction.all.innodb_active_trans != 0 || + transaction.all.ndb_tid != 0); #else return 0; #endif diff --git a/sql/sql_db.cc b/sql/sql_db.cc index 51b875385f1..a6bd1955fa5 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -551,7 +551,12 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *db, If the directory is a symbolic link, remove the link first, then remove the directory the symbolic link pointed at */ - if (!found_other_files) + if (found_other_files) + { + my_error(ER_DB_DROP_RMDIR, MYF(0), org_path, EEXIST); + DBUG_RETURN(-1); + } + else { char tmp_path[FN_REFLEN], *pos; char *path= tmp_path; diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc index e0e8fed29c8..19a6941fe32 100644 --- a/sql/sql_lex.cc +++ b/sql/sql_lex.cc @@ -879,13 +879,15 @@ int yylex(void *arg, void *yythd) } yySkip(); return (SET_VAR); - case MY_LEX_COLON: // optional line terminator + case MY_LEX_SEMICOLON: // optional line terminator if (yyPeek()) { - if (((THD *)yythd)->client_capabilities & CLIENT_MULTI_STATEMENTS) + THD* thd= (THD*)yythd; + if ((thd->client_capabilities & CLIENT_MULTI_STATEMENTS) && + (thd->command != COM_PREPARE)) { lex->found_colon=(char*)lex->ptr; - ((THD *)yythd)->server_status |= SERVER_MORE_RESULTS_EXISTS; + thd->server_status |= SERVER_MORE_RESULTS_EXISTS; lex->next_state=MY_LEX_END; return(END_OF_INPUT); } @@ -1529,7 +1531,7 @@ bool st_select_lex::setup_ref_array(THD *thd, uint order_group_num) */ bool st_select_lex_unit::check_updateable(char *db, char *table) { - for(SELECT_LEX *sl= first_select(); sl; sl= sl->next_select()) + for (SELECT_LEX *sl= first_select(); sl; sl= sl->next_select()) if (sl->check_updateable(db, table)) return 1; return 0; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 64fa398e5f4..34d0d2b31f9 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -5260,5 +5260,5 @@ int create_table_precheck(THD *thd, TABLE_LIST *tables, DBUG_RETURN(1); DBUG_RETURN((grant_option && want_priv != CREATE_TMP_ACL && check_grant(thd, want_priv, create_table, 0, UINT_MAX, 0)) ? - 1 : 0) + 1 : 0); } diff --git a/sql/sql_show.cc b/sql/sql_show.cc index e87a37150ea..448dc825a26 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -181,7 +181,7 @@ int mysqld_show_storage_engines(THD *thd) Protocol *protocol= thd->protocol; DBUG_ENTER("mysqld_show_storage_engines"); - field_list.push_back(new Item_empty_string("Type",10)); + field_list.push_back(new Item_empty_string("Engine",10)); field_list.push_back(new Item_empty_string("Support",10)); field_list.push_back(new Item_empty_string("Comment",80)); @@ -471,7 +471,7 @@ int mysqld_extend_show_tables(THD *thd,const char *db,const char *wild) (void) sprintf(path,"%s/%s",mysql_data_home,db); (void) unpack_dirname(path,path); field_list.push_back(item=new Item_empty_string("Name",NAME_LEN)); - field_list.push_back(item=new Item_empty_string("Type",10)); + field_list.push_back(item=new Item_empty_string("Engine",10)); item->maybe_null=1; field_list.push_back(item=new Item_empty_string("Row_format",10)); item->maybe_null=1; diff --git a/sql/sql_string.cc b/sql/sql_string.cc index a4eae8a6346..8a093738e2b 100644 --- a/sql/sql_string.cc +++ b/sql/sql_string.cc @@ -830,7 +830,7 @@ outp: void String::print(String *str) { char *st= (char*)Ptr, *end= st+str_length; - for(; st < end; st++) + for (; st < end; st++) { uchar c= *st; switch (c) diff --git a/sql/sql_table.cc b/sql/sql_table.cc index d5f77bf545d..c46a9823a52 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1148,6 +1148,35 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name, } } + /* + Check that table with given name does not already + exist in any storage engine. In such a case it should + be discovered and the error ER_TABLE_EXISTS_ERROR be returned + unless user specified CREATE TABLE IF EXISTS + The LOCK_open mutex has been locked to make sure no + one else is attempting to discover the table. Since + it's not on disk as a frm file, no one could be using it! + */ + if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE)) + { + bool create_if_not_exists = + create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS; + if (!create_table_from_handler(db, table_name, + create_if_not_exists)) + { + DBUG_PRINT("info", ("Table already existed in handler")); + + if (create_if_not_exists) + { + create_info->table_existed= 1; // Mark that table existed + error= 0; + } + else + my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name); + goto end; + } + } + thd->proc_info="creating table"; create_info->table_existed= 0; // Mark that table is created diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index a25bd3f17bc..247bec84e8e 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -181,7 +181,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token ACTION %token AGGREGATE_SYM %token ALL -%token AND +%token AND_SYM %token AS %token ASC %token AUTO_INC @@ -305,6 +305,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token NAMES_SYM %token NATIONAL_SYM %token NATURAL +%token NDBCLUSTER_SYM %token NEW_SYM %token NCHAR_SYM %token NCHAR_STRING @@ -318,7 +319,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token OPEN_SYM %token OPTION %token OPTIONALLY -%token OR +%token OR_SYM %token OR_OR_CONCAT %token ORDER_SYM %token OUTER @@ -576,8 +577,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); %token BEFORE_SYM %left SET_VAR -%left OR_OR_CONCAT OR XOR -%left AND +%left OR_OR_CONCAT OR_SYM XOR +%left AND_SYM %left BETWEEN_SYM CASE_SYM WHEN_SYM THEN_SYM ELSE %left EQ EQUAL_SYM GE GT_SYM LE LT NE IS LIKE REGEXP IN_SYM %left '|' @@ -730,7 +731,7 @@ END_OF_INPUT %type <NONE> '-' '+' '*' '/' '%' '(' ')' - ',' '!' '{' '}' '&' '|' AND OR OR_OR_CONCAT BETWEEN_SYM CASE_SYM + ',' '!' '{' '}' '&' '|' AND_SYM OR_SYM OR_OR_CONCAT BETWEEN_SYM CASE_SYM THEN_SYM WHEN_SYM DIV_SYM MOD_SYM %% @@ -2493,14 +2494,14 @@ expr_expr: { $$= new Item_func_not(new Item_in_subselect($1, $4)); } - | expr BETWEEN_SYM no_and_expr AND expr + | expr BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_between($1,$3,$5); } - | expr NOT BETWEEN_SYM no_and_expr AND expr + | expr NOT BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_not(new Item_func_between($1,$4,$6)); } | expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); } - | expr OR expr { $$= new Item_cond_or($1,$3); } + | expr OR_SYM expr { $$= new Item_cond_or($1,$3); } | expr XOR expr { $$= new Item_cond_xor($1,$3); } - | expr AND expr { $$= new Item_cond_and($1,$3); } + | expr AND_SYM expr { $$= new Item_cond_and($1,$3); } | expr SOUNDS_SYM LIKE expr { $$= new Item_func_eq(new Item_func_soundex($1), @@ -2541,14 +2542,14 @@ expr_expr: /* expressions that begin with 'expr' that do NOT follow IN_SYM */ no_in_expr: - no_in_expr BETWEEN_SYM no_and_expr AND expr + no_in_expr BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_between($1,$3,$5); } - | no_in_expr NOT BETWEEN_SYM no_and_expr AND expr + | no_in_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_not(new Item_func_between($1,$4,$6)); } | no_in_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); } - | no_in_expr OR expr { $$= new Item_cond_or($1,$3); } + | no_in_expr OR_SYM expr { $$= new Item_cond_or($1,$3); } | no_in_expr XOR expr { $$= new Item_cond_xor($1,$3); } - | no_in_expr AND expr { $$= new Item_cond_and($1,$3); } + | no_in_expr AND_SYM expr { $$= new Item_cond_and($1,$3); } | no_in_expr SOUNDS_SYM LIKE expr { $$= new Item_func_eq(new Item_func_soundex($1), @@ -2599,12 +2600,12 @@ no_and_expr: { $$= new Item_func_not(new Item_in_subselect($1, $4)); } - | no_and_expr BETWEEN_SYM no_and_expr AND expr + | no_and_expr BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_between($1,$3,$5); } - | no_and_expr NOT BETWEEN_SYM no_and_expr AND expr + | no_and_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr { $$= new Item_func_not(new Item_func_between($1,$4,$6)); } | no_and_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); } - | no_and_expr OR expr { $$= new Item_cond_or($1,$3); } + | no_and_expr OR_SYM expr { $$= new Item_cond_or($1,$3); } | no_and_expr XOR expr { $$= new Item_cond_xor($1,$3); } | no_and_expr SOUNDS_SYM LIKE expr { @@ -4219,8 +4220,8 @@ show_param: YYABORT; } | NEW_SYM MASTER_SYM FOR_SYM SLAVE WITH MASTER_LOG_FILE_SYM EQ - TEXT_STRING_sys AND MASTER_LOG_POS_SYM EQ ulonglong_num - AND MASTER_SERVER_ID_SYM EQ + TEXT_STRING_sys AND_SYM MASTER_LOG_POS_SYM EQ ulonglong_num + AND_SYM MASTER_SERVER_ID_SYM EQ ULONG_NUM { Lex->sql_command = SQLCOM_SHOW_NEW_MASTER; @@ -5056,6 +5057,7 @@ keyword: | NAMES_SYM {} | NATIONAL_SYM {} | NCHAR_SYM {} + | NDBCLUSTER_SYM {} | NEXT_SYM {} | NEW_SYM {} | NO_SYM {} @@ -5508,7 +5510,7 @@ grant_privilege: opt_and: /* empty */ {} - | AND {} + | AND_SYM {} ; require_list: diff --git a/sql/table.cc b/sql/table.cc index 1b7d30560ef..281a8c10409 100644 --- a/sql/table.cc +++ b/sql/table.cc @@ -944,7 +944,8 @@ static void frm_error(int error, TABLE *form, const char *name, myf errortype) break; case 2: { - datext=form->file ? *form->file->bas_ext() : ""; + datext= form->file ? *form->file->bas_ext() : ""; + datext= datext==NullS ? "" : datext; err_no= (my_errno == ENOENT) ? ER_FILE_NOT_FOUND : (my_errno == EAGAIN) ? ER_FILE_USED : ER_CANT_OPEN_FILE; my_error(err_no,errortype, diff --git a/sql/unireg.h b/sql/unireg.h index 442809a9e08..4ab2ba26b15 100644 --- a/sql/unireg.h +++ b/sql/unireg.h @@ -48,7 +48,7 @@ #define MAX_ALIAS_NAME 256 #define MAX_FIELD_NAME 34 /* Max colum name length +2 */ #define MAX_SYS_VAR_LENGTH 32 -#define MAX_KEY 32 /* Max used keys */ +#define MAX_KEY 64 /* Max used keys */ #define MAX_REF_PARTS 16 /* Max parts used as ref */ #define MAX_KEY_LENGTH 1024 /* max possible key */ #if SIZEOF_OFF_T > 4 |