summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <sergefp@mysql.com>2004-04-28 19:27:56 +0400
committerunknown <sergefp@mysql.com>2004-04-28 19:27:56 +0400
commit2902f9e8a57ca76cc52863d70d02dcf7693ce8cd (patch)
tree4864372894fc29ff2e4d9de93f2c438b29184acb /sql
parent15e207b0e7d1cd010b9e0eea4663c98e1eb209de (diff)
parent5605374b668068ef5ede826b3d04e2fe7ead4f9f (diff)
downloadmariadb-git-2902f9e8a57ca76cc52863d70d02dcf7693ce8cd.tar.gz
Merge spetrunia@bk-internal.mysql.com:/home/bk/mysql-4.1
into mysql.com:/dbdata/psergey/mysql-4.1-ps-merge sql/lex.h: Auto merged sql/mysql_priv.h: Auto merged sql/mysqld.cc: Auto merged sql/sql_class.h: Auto merged sql/sql_parse.cc: Auto merged sql/sql_yacc.yy: Auto merged
Diffstat (limited to 'sql')
-rw-r--r--sql/Makefile.am10
-rw-r--r--sql/discover.cc172
-rw-r--r--sql/gen_lex_hash.cc4
-rw-r--r--sql/ha_innodb.cc294
-rw-r--r--sql/ha_innodb.h2
-rw-r--r--sql/ha_ndbcluster.cc2943
-rw-r--r--sql/ha_ndbcluster.h218
-rw-r--r--sql/handler.cc92
-rw-r--r--sql/handler.h25
-rw-r--r--sql/item_subselect.cc2
-rw-r--r--sql/item_sum.h1
-rw-r--r--sql/lex.h8
-rw-r--r--sql/log_event.cc37
-rw-r--r--sql/mysql_priv.h7
-rw-r--r--sql/mysqld.cc28
-rw-r--r--sql/set_var.cc9
-rw-r--r--sql/slave.cc65
-rw-r--r--sql/slave.h29
-rw-r--r--sql/sql_base.cc21
-rw-r--r--sql/sql_class.h8
-rw-r--r--sql/sql_db.cc7
-rw-r--r--sql/sql_lex.cc10
-rw-r--r--sql/sql_parse.cc2
-rw-r--r--sql/sql_show.cc4
-rw-r--r--sql/sql_string.cc2
-rw-r--r--sql/sql_table.cc29
-rw-r--r--sql/sql_yacc.yy40
-rw-r--r--sql/table.cc3
-rw-r--r--sql/unireg.h2
29 files changed, 3847 insertions, 227 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am
index bacf3bc58d1..1044673f8ec 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -16,12 +16,11 @@
#called from the top level Makefile
-
MYSQLDATAdir = $(localstatedir)
MYSQLSHAREdir = $(pkgdatadir)
MYSQLBASEdir= $(prefix)
INCLUDES = @MT_INCLUDES@ \
- @bdb_includes@ @innodb_includes@ \
+ @bdb_includes@ @innodb_includes@ @ndbcluster_includes@ \
-I$(top_srcdir)/include -I$(top_srcdir)/regex \
-I$(srcdir) $(openssl_includes)
WRAPLIBS= @WRAPLIBS@
@@ -42,6 +41,7 @@ LDADD = @isam_libs@ \
mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \
@bdb_libs@ @innodb_libs@ @pstack_libs@ \
@innodb_system_libs@ \
+ @ndbcluster_libs@ @ndbcluster_system_libs@ \
$(LDADD) $(CXXLDFLAGS) $(WRAPLIBS) @LIBDL@ @openssl_libs@
noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
item_strfunc.h item_timefunc.h item_uniq.h \
@@ -52,7 +52,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
field.h handler.h \
ha_isammrg.h ha_isam.h ha_myisammrg.h\
ha_heap.h ha_myisam.h ha_berkeley.h ha_innodb.h \
- opt_range.h protocol.h \
+ ha_ndbcluster.h opt_range.h protocol.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h\
lex.h lex_symbol.h sql_acl.h sql_crypt.h \
log_event.h sql_repl.h slave.h \
@@ -76,11 +76,11 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc \
procedure.cc item_uniq.cc sql_test.cc \
log.cc log_event.cc init.cc derror.cc sql_acl.cc \
unireg.cc des_key_file.cc \
- time.cc opt_range.cc opt_sum.cc \
+ discover.cc time.cc opt_range.cc opt_sum.cc \
records.cc filesort.cc handler.cc \
ha_heap.cc ha_myisam.cc ha_myisammrg.cc \
ha_berkeley.cc ha_innodb.cc \
- ha_isam.cc ha_isammrg.cc \
+ ha_isam.cc ha_isammrg.cc ha_ndbcluster.cc \
sql_db.cc sql_table.cc sql_rename.cc sql_crypt.cc \
sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
diff --git a/sql/discover.cc b/sql/discover.cc
new file mode 100644
index 00000000000..e260f44a8db
--- /dev/null
+++ b/sql/discover.cc
@@ -0,0 +1,172 @@
+/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+/* Functions for discover of frm file from handler */
+
+#include "mysql_priv.h"
+#include <my_dir.h>
+
+/*
+ Read the contents of a .frm file
+
+ SYNOPSIS
+ readfrm()
+
+ name path to table-file "db/name"
+ frmdata frm data
+ len length of the read frmdata
+
+ RETURN VALUES
+ 0 ok
+ 1 Could not open file
+ 2 Could not stat file
+ 3 Could not allocate data for read
+ Could not read file
+
+ frmdata and len are set to 0 on error
+*/
+
+int readfrm(const char *name,
+ const void **frmdata, uint *len)
+{
+ int error;
+ char index_file[FN_REFLEN];
+ File file;
+ ulong read_len;
+ char *read_data;
+ MY_STAT state;
+ DBUG_ENTER("readfrm");
+ DBUG_PRINT("enter",("name: '%s'",name));
+
+ *frmdata= NULL; // In case of errors
+ *len= 0;
+ error= 1;
+ if ((file=my_open(fn_format(index_file,name,"",reg_ext,4),
+ O_RDONLY | O_SHARE,
+ MYF(0))) < 0)
+ goto err_end;
+
+ // Get length of file
+ error= 2;
+ if (my_fstat(file, &state, MYF(0)))
+ goto err;
+ read_len= state.st_size;
+
+ // Read whole frm file
+ error= 3;
+ read_data= 0;
+ if (read_string(file, &read_data, read_len))
+ goto err;
+
+ // Setup return data
+ *frmdata= (void*) read_data;
+ *len= read_len;
+ error= 0;
+
+ err:
+ if (file > 0)
+ VOID(my_close(file,MYF(MY_WME)));
+
+ err_end: /* Here when no file */
+ DBUG_RETURN (error);
+} /* readfrm */
+
+
+/*
+ Write the content of a frm data pointer
+ to a frm file
+
+ SYNOPSIS
+ writefrm()
+
+ name path to table-file "db/name"
+ frmdata frm data
+ len length of the frmdata
+
+ RETURN VALUES
+ 0 ok
+ 2 Could not write file
+*/
+
+int writefrm(const char *name, const void *frmdata, uint len)
+{
+ File file;
+ char index_file[FN_REFLEN];
+ int error;
+ DBUG_ENTER("writefrm");
+ DBUG_PRINT("enter",("name: '%s' len: %d ",name,len));
+ //DBUG_DUMP("frmdata", (char*)frmdata, len);
+
+ error= 0;
+ if ((file=my_create(fn_format(index_file,name,"",reg_ext,4),
+ CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
+ {
+ if (my_write(file,(byte*)frmdata,len,MYF(MY_WME | MY_NABP)))
+ error= 2;
+ }
+ VOID(my_close(file,MYF(0)));
+ DBUG_RETURN(error);
+} /* writefrm */
+
+
+
+
+/*
+ Try to discover table from handler and
+ if found, write the frm file to disk.
+
+ RETURN VALUES:
+ 0 : Table existed in handler and created
+ on disk if so requested
+ 1 : Table does not exist
+ >1 : error
+
+*/
+
+int create_table_from_handler(const char *db,
+ const char *name,
+ bool create_if_found)
+{
+ int error= 0;
+ const void* frmblob = NULL;
+ char path[FN_REFLEN];
+ uint frmlen = 0;
+ DBUG_ENTER("create_table_from_handler");
+ DBUG_PRINT("enter", ("create_if_found: %d", create_if_found));
+
+ if (ha_discover(db, name, &frmblob, &frmlen))
+ DBUG_RETURN(1); // Table does not exist
+
+ // Table exists in handler
+ if (create_if_found)
+ {
+ (void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS);
+ // Save the frm file
+ error = writefrm(path, frmblob, frmlen);
+ }
+
+ err:
+ if (frmblob)
+ my_free((char*) frmblob,MYF(0));
+ DBUG_RETURN(error);
+}
+
+int table_exists_in_handler(const char *db,
+ const char *name)
+{
+ return (create_table_from_handler(db, name, false) == 0);
+}
diff --git a/sql/gen_lex_hash.cc b/sql/gen_lex_hash.cc
index 5dc7c50e04c..7a445ed8c4d 100644
--- a/sql/gen_lex_hash.cc
+++ b/sql/gen_lex_hash.cc
@@ -461,7 +461,7 @@ int main(int argc,char **argv)
hash_map= sql_functions_map;\n\
register uint32 cur_struct= uint4korr(hash_map+((len-1)*4));\n\
\n\
- for(;;){\n\
+ for (;;){\n\
register uchar first_char= (uchar)cur_struct;\n\
\n\
if (first_char == 0)\n\
@@ -492,7 +492,7 @@ int main(int argc,char **argv)
hash_map= symbols_map;\n\
register uint32 cur_struct= uint4korr(hash_map+((len-1)*4));\n\
\n\
- for(;;){\n\
+ for (;;){\n\
register uchar first_char= (uchar)cur_struct;\n\
\n\
if (first_char==0){\n\
diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc
index a5dc6719182..4192df22e5c 100644
--- a/sql/ha_innodb.cc
+++ b/sql/ha_innodb.cc
@@ -15,7 +15,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* This file defines the InnoDB handler: the interface between MySQL and
-InnoDB */
+InnoDB
+NOTE: You can only use noninlined InnoDB functions in this file, because we
+have disables the InnoDB inlining in this file. */
/* TODO list for the InnoDB handler in 4.1:
- Check if the query_id is now right also in prepared and executed stats
@@ -74,6 +76,7 @@ extern "C" {
#include "../innobase/include/btr0cur.h"
#include "../innobase/include/btr0btr.h"
#include "../innobase/include/fsp0fsp.h"
+#include "../innobase/include/sync0sync.h"
#include "../innobase/include/fil0fil.h"
}
@@ -321,70 +324,49 @@ convert_error_code_to_mysql(
}
}
-extern "C" {
/*****************************************************************
Prints info of a THD object (== user session thread) to the
standard output. NOTE that /mysql/innobase/trx/trx0trx.c must contain
the prototype for this function! */
-
+extern "C"
void
innobase_mysql_print_thd(
/*=====================*/
- char* buf, /* in/out: buffer where to print, must be at least
- 400 bytes */
+ FILE* f, /* in: output stream */
void* input_thd)/* in: pointer to a MySQL THD object */
{
THD* thd;
- char* old_buf = buf;
thd = (THD*) input_thd;
- /* We cannot use the return value of normal sprintf() as this is
- not portable to some old non-Posix Unixes, e.g., some old SCO
- Unixes */
-
- buf += my_sprintf(buf,
- (buf, "MySQL thread id %lu, query id %lu",
- thd->thread_id, thd->query_id));
- if (thd->host) {
- *buf = ' ';
- buf++;
- buf = strnmov(buf, thd->host, 30);
- }
+ fprintf(f, "MySQL thread id %lu, query id %lu",
+ thd->thread_id, thd->query_id);
+ if (thd->host) {
+ putc(' ', f);
+ fputs(thd->host, f);
+ }
- if (thd->ip) {
- *buf = ' ';
- buf++;
- buf=strnmov(buf, thd->ip, 20);
- }
+ if (thd->ip) {
+ putc(' ', f);
+ fputs(thd->ip, f);
+ }
if (thd->user) {
- *buf = ' ';
- buf++;
- buf=strnmov(buf, thd->user, 20);
- }
-
- if (thd->proc_info) {
- *buf = ' ';
- buf++;
- buf=strnmov(buf, thd->proc_info, 50);
+ putc(' ', f);
+ fputs(thd->user, f);
}
- if (thd->query) {
- *buf = '\n';
- buf++;
- buf=strnmov(buf, thd->query, 150);
- }
-
- buf[0] = '\n';
- buf[1] = '\0'; /* Note that we must put a null character here to end
- the printed string */
+ if (thd->proc_info) {
+ putc(' ', f);
+ fputs(thd->proc_info, f);
+ }
- /* We test the printed length did not overrun the buffer length of
- 400 bytes */
+ if (thd->query) {
+ putc(' ', f);
+ fputs(thd->query, f);
+ }
- ut_a(strlen(old_buf) < 400);
-}
+ putc('\n', f);
}
/*************************************************************************
@@ -627,12 +609,11 @@ innobase_query_caching_of_table_permitted(
return((my_bool)FALSE);
}
-extern "C" {
/*********************************************************************
Invalidates the MySQL query cache for the table.
NOTE that the exact prototype of this function has to be in
/innobase/row/row0ins.c! */
-
+extern "C"
void
innobase_invalidate_query_cache(
/*============================*/
@@ -652,6 +633,17 @@ innobase_invalidate_query_cache(
TRUE);
#endif
}
+
+/*********************************************************************
+Get the quote character to be used in SQL identifiers. */
+extern "C"
+char
+mysql_get_identifier_quote_char(void)
+/*=================================*/
+ /* out: quote character to be
+ used in SQL identifiers */
+{
+ return '`';
}
/*********************************************************************
@@ -2102,24 +2094,20 @@ ha_innobase::write_row(
if (prebuilt->trx !=
(trx_t*) current_thd->transaction.all.innobase_tid) {
- char err_buf[2000];
-
fprintf(stderr,
"InnoDB: Error: the transaction object for the table handle is at\n"
-"InnoDB: %lx, but for the current thread it is at %lx\n",
- (ulong)prebuilt->trx,
- (ulong)current_thd->transaction.all.innobase_tid);
-
- ut_sprintf_buf(err_buf, ((byte*)prebuilt) - 100, 200);
- fprintf(stderr,
-"InnoDB: Dump of 200 bytes around prebuilt: %.1000s\n", err_buf);
-
- ut_sprintf_buf(err_buf,
+"InnoDB: %p, but for the current thread it is at %p\n",
+ prebuilt->trx,
+ current_thd->transaction.all.innobase_tid);
+ fputs("InnoDB: Dump of 200 bytes around prebuilt: ", stderr);
+ ut_print_buf(stderr, ((const byte*)prebuilt) - 100, 200);
+ fputs("\n"
+ "InnoDB: Dump of 200 bytes around transaction.all: ",
+ stderr);
+ ut_print_buf(stderr,
((byte*)(&(current_thd->transaction.all))) - 100, 200);
- fprintf(stderr,
-"InnoDB: Dump of 200 bytes around transaction.all: %.1000s\n", err_buf);
-
- ut_a(0);
+ putc('\n', stderr);
+ ut_error;
}
statistic_increment(ha_write_count, &LOCK_status);
@@ -3390,12 +3378,9 @@ create_index(
field = form->field[j];
- if (strlen(field->field_name)
- == strlen(key_part->field->field_name)
- && 0 == ut_cmp_in_lower_case(
+ if (0 == ut_cmp_in_lower_case(
(char*)field->field_name,
- (char*)key_part->field->field_name,
- strlen(field->field_name))) {
+ (char*)key_part->field->field_name)) {
/* Found the corresponding column */
break;
@@ -3762,7 +3747,8 @@ ha_innobase::delete_table(
/* Drop the table in InnoDB */
- error = row_drop_table_for_mysql(norm_name, trx);
+ error = row_drop_table_for_mysql(norm_name, trx,
+ thd->lex->sql_command == SQLCOM_DROP_DB);
/* Flush the log to reduce probability that the .frm files and
the InnoDB data dictionary get out-of-sync if the user runs
@@ -3801,7 +3787,7 @@ innobase_drop_database(
trx_t* trx;
char* ptr;
int error;
- char namebuf[10000];
+ char* namebuf;
/* Get the transaction associated with the current thd, or create one
if not yet created */
@@ -3821,6 +3807,7 @@ innobase_drop_database(
}
ptr++;
+ namebuf = my_malloc(len + 2, MYF(0));
memcpy(namebuf, ptr, len);
namebuf[len] = '/';
@@ -3837,6 +3824,7 @@ innobase_drop_database(
}
error = row_drop_database_for_mysql(namebuf, trx);
+ my_free(namebuf, MYF(0));
/* Flush the log to reduce probability that the .frm files and
the InnoDB data dictionary get out-of-sync if the user runs
@@ -4361,15 +4349,18 @@ ha_innobase::update_table_comment(
info on foreign keys */
const char* comment)/* in: table comment defined by user */
{
- row_prebuilt_t* prebuilt = (row_prebuilt_t*)innobase_prebuilt;
- uint length = strlen(comment);
- char* str = my_malloc(length + 16500, MYF(0));
- char* pos;
+ uint length = strlen(comment);
+ char* str;
+ row_prebuilt_t* prebuilt = (row_prebuilt_t*)innobase_prebuilt;
/* We do not know if MySQL can call this function before calling
external_lock(). To be safe, update the thd of the current table
handle. */
+ if(length > 64000 - 3) {
+ return((char*)comment); /* string too long */
+ }
+
update_thd(current_thd);
prebuilt->trx->op_info = (char*)"returning table comment";
@@ -4378,37 +4369,47 @@ ha_innobase::update_table_comment(
possible adaptive hash latch to avoid deadlocks of threads */
trx_search_latch_release_if_reserved(prebuilt->trx);
-
- if (!str) {
- prebuilt->trx->op_info = (char*)"";
+ str = NULL;
- return((char*)comment);
- }
+ if (FILE* file = tmpfile()) {
+ long flen;
- pos = str;
- if (length) {
- pos=strmov(str, comment);
- *pos++=';';
- *pos++=' ';
- }
+ /* output the data to a temporary file */
+ fprintf(file, "InnoDB free: %lu kB",
+ (ulong) fsp_get_available_space_in_free_extents(
+ prebuilt->table->space));
+
+ dict_print_info_on_foreign_keys(FALSE, file, prebuilt->table);
+ flen = ftell(file);
+ if(length + flen + 3 > 64000) {
+ flen = 64000 - 3 - length;
+ }
- pos += my_sprintf(pos,
- (pos,"InnoDB free: %lu kB",
- (ulong) fsp_get_available_space_in_free_extents(
- prebuilt->table->space)));
+ ut_ad(flen > 0);
- /* We assume 16000 - length bytes of space to print info; the limit
- 16000 bytes is arbitrary, and MySQL could handle at least 64000
- bytes */
-
- if (length < 16000) {
- dict_print_info_on_foreign_keys(FALSE, pos, 16000 - length,
- prebuilt->table);
+ /* allocate buffer for the full string, and
+ read the contents of the temporary file */
+
+ str = my_malloc(length + flen + 3, MYF(0));
+
+ if (str) {
+ char* pos = str + length;
+ if(length) {
+ memcpy(str, comment, length);
+ *pos++ = ';';
+ *pos++ = ' ';
+ }
+ rewind(file);
+ flen = fread(pos, 1, flen, file);
+ pos[flen] = 0;
+ }
+
+ fclose(file);
}
prebuilt->trx->op_info = (char*)"";
- return(str);
+ return(str ? str : (char*) comment);
}
/***********************************************************************
@@ -4422,7 +4423,7 @@ ha_innobase::get_foreign_key_create_info(void)
MUST be freed with ::free_foreign_key_create_info */
{
row_prebuilt_t* prebuilt = (row_prebuilt_t*)innobase_prebuilt;
- char* str;
+ char* str = 0;
ut_a(prebuilt != NULL);
@@ -4432,23 +4433,47 @@ ha_innobase::get_foreign_key_create_info(void)
update_thd(current_thd);
- prebuilt->trx->op_info = (char*)"getting info on foreign keys";
+ if (FILE* file = tmpfile()) {
+ long flen;
- /* In case MySQL calls this in the middle of a SELECT query, release
- possible adaptive hash latch to avoid deadlocks of threads */
+ prebuilt->trx->op_info = (char*)"getting info on foreign keys";
- trx_search_latch_release_if_reserved(prebuilt->trx);
-
- str = (char*)ut_malloc(10000);
+ /* In case MySQL calls this in the middle of a SELECT query,
+ release possible adaptive hash latch to avoid
+ deadlocks of threads */
- str[0] = '\0';
-
- dict_print_info_on_foreign_keys(TRUE, str, 9000, prebuilt->table);
+ trx_search_latch_release_if_reserved(prebuilt->trx);
- prebuilt->trx->op_info = (char*)"";
+ /* output the data to a temporary file */
+ dict_print_info_on_foreign_keys(TRUE, file, prebuilt->table);
+ prebuilt->trx->op_info = (char*)"";
+
+ flen = ftell(file);
+ if(flen > 64000 - 1) {
+ flen = 64000 - 1;
+ }
+
+ ut_ad(flen >= 0);
+
+ /* allocate buffer for the string, and
+ read the contents of the temporary file */
+
+ str = my_malloc(flen + 1, MYF(0));
+
+ if (str) {
+ rewind(file);
+ flen = fread(str, 1, flen, file);
+ str[flen] = 0;
+ }
+
+ fclose(file);
+ } else {
+ /* unable to create temporary file */
+ str = my_malloc(1, MYF(MY_ZEROFILL));
+ }
return(str);
-}
+}
/***********************************************************************
Checks if a table is referenced by a foreign key. The MySQL manual states that
@@ -4481,7 +4506,7 @@ ha_innobase::free_foreign_key_create_info(
char* str) /* in, own: create info string to free */
{
if (str) {
- ut_free(str);
+ my_free(str, MYF(0));
}
}
@@ -4772,34 +4797,55 @@ innodb_show_status(
innobase_release_stat_resources(trx);
- /* We let the InnoDB Monitor to output at most 60 kB of text, add
- a safety margin of 100 kB for buffer overruns */
+ /* We let the InnoDB Monitor to output at most 64000 bytes of text. */
- buf = (char*)ut_malloc(160 * 1024);
+ long flen;
+ char* str;
- srv_sprintf_innodb_monitor(buf, 60 * 1024);
+ mutex_enter_noninline(&srv_monitor_file_mutex);
+ rewind(srv_monitor_file);
+ srv_printf_innodb_monitor(srv_monitor_file);
+ flen = ftell(srv_monitor_file);
+ if(flen > 64000 - 1) {
+ flen = 64000 - 1;
+ }
- List<Item> field_list;
+ ut_ad(flen > 0);
- field_list.push_back(new Item_empty_string("Status", strlen(buf)));
+ /* allocate buffer for the string, and
+ read the contents of the temporary file */
- if (protocol->send_fields(&field_list, 1))
+ if (!(str = my_malloc(flen + 1, MYF(0))))
{
- ut_free(buf);
- DBUG_RETURN(-1);
+ mutex_exit_noninline(&srv_monitor_file_mutex);
+ DBUG_RETURN(-1);
}
- protocol->prepare_for_resend();
- protocol->store(buf, strlen(buf), system_charset_info);
+ rewind(srv_monitor_file);
+ flen = fread(str, 1, flen, srv_monitor_file);
+
+ mutex_exit_noninline(&srv_monitor_file_mutex);
+
+ List<Item> field_list;
- ut_free(buf);
+ field_list.push_back(new Item_empty_string("Status", flen));
+
+ if (protocol->send_fields(&field_list, 1)) {
+
+ my_free(str, MYF(0));
+
+ DBUG_RETURN(-1);
+ }
+
+ protocol->prepare_for_resend();
+ protocol->store(str, flen, system_charset_info);
+ my_free(str, MYF(0));
if (protocol->write())
DBUG_RETURN(-1);
- send_eof(thd);
-
- DBUG_RETURN(0);
+ send_eof(thd);
+ DBUG_RETURN(0);
}
/****************************************************************************
diff --git a/sql/ha_innodb.h b/sql/ha_innodb.h
index e5f1c66e23a..83a3edc4126 100644
--- a/sql/ha_innodb.h
+++ b/sql/ha_innodb.h
@@ -123,6 +123,8 @@ class ha_innobase: public handler
whose size is > MAX_KEY_LENGTH */
uint max_key_length() const { return((MAX_KEY_LENGTH <= 3500) ?
MAX_KEY_LENGTH : 3500);}
+ uint max_key_part_length() { return((MAX_KEY_LENGTH <= 3500) ?
+ MAX_KEY_LENGTH : 3500);}
const key_map *keys_to_use_for_scanning() { return &key_map_full; }
bool has_transactions() { return 1;}
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
new file mode 100644
index 00000000000..3bc322878d1
--- /dev/null
+++ b/sql/ha_ndbcluster.cc
@@ -0,0 +1,2943 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+/*
+ This file defines the NDB Cluster handler: the interface between MySQL and
+ NDB Cluster
+*/
+
+/*
+ TODO
+ After CREATE DATABASE gör discover på alla tabeller i den databasen
+
+*/
+
+
+#ifdef __GNUC__
+#pragma implementation // gcc: Class implementation
+#endif
+
+#include "mysql_priv.h"
+
+#ifdef HAVE_NDBCLUSTER_DB
+#include <my_dir.h>
+#include "ha_ndbcluster.h"
+#include <ndbapi/NdbApi.hpp>
+#include <ndbapi/NdbScanFilter.hpp>
+
+#define USE_DISCOVER_ON_STARTUP
+//#define USE_NDB_POOL
+
+// Default value for parallelism
+static const int parallelism= 240;
+
+#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
+
+#define ERR_PRINT(err) \
+ DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
+
+#define ERR_RETURN(err) \
+{ \
+ ERR_PRINT(err); \
+ DBUG_RETURN(ndb_to_mysql_error(&err)); \
+}
+
+// Typedefs for long names
+typedef NdbDictionary::Column NDBCOL;
+typedef NdbDictionary::Table NDBTAB;
+typedef NdbDictionary::Index NDBINDEX;
+typedef NdbDictionary::Dictionary NDBDICT;
+
+bool ndbcluster_inited= false;
+
+// Handler synchronization
+pthread_mutex_t ndbcluster_mutex;
+
+// Table lock handling
+static HASH ndbcluster_open_tables;
+
+static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
+ my_bool not_used __attribute__((unused)));
+static NDB_SHARE *get_share(const char *table_name);
+static void free_share(NDB_SHARE *share);
+
+static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
+static int unpackfrm(const void **data, uint *len,
+ const void* pack_data);
+
+/*
+ Error handling functions
+*/
+
+struct err_code_mapping
+{
+ int ndb_err;
+ int my_err;
+};
+
+static const err_code_mapping err_map[]=
+{
+ { 626, HA_ERR_KEY_NOT_FOUND },
+ { 630, HA_ERR_FOUND_DUPP_KEY },
+ { 893, HA_ERR_FOUND_DUPP_UNIQUE },
+ { 721, HA_ERR_TABLE_EXIST },
+ { 241, HA_ERR_OLD_METADATA },
+ { -1, -1 }
+};
+
+
+static int ndb_to_mysql_error(const NdbError *err)
+{
+ uint i;
+ for (i=0 ; err_map[i].ndb_err != err->code ; i++)
+ {
+ if (err_map[i].my_err == -1)
+ return err->code;
+ }
+ return err_map[i].my_err;
+}
+
+
+/*
+ Take care of the error that occured in NDB
+
+ RETURN
+ 0 No error
+ # The mapped error code
+*/
+
+int ha_ndbcluster::ndb_err(NdbConnection *trans)
+{
+ const NdbError err= trans->getNdbError();
+ if (!err.code)
+ return 0; // Don't log things to DBUG log if no error
+ DBUG_ENTER("ndb_err");
+
+ ERR_PRINT(err);
+ switch (err.classification) {
+ case NdbError::SchemaError:
+ {
+ NDBDICT *dict= m_ndb->getDictionary();
+ DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
+ dict->invalidateTable(m_tabname);
+ break;
+ }
+ default:
+ break;
+ }
+ DBUG_RETURN(ndb_to_mysql_error(&err));
+}
+
+
+/*
+ Instruct NDB to set the value of the hidden primary key
+*/
+
+bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
+ uint fieldnr, const byte *field_ptr)
+{
+ DBUG_ENTER("set_hidden_key");
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr,
+ NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0);
+}
+
+
+/*
+ Instruct NDB to set the value of one primary key attribute
+*/
+
+int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
+ uint fieldnr, const byte *field_ptr)
+{
+ uint32 pack_len= field->pack_length();
+ DBUG_ENTER("set_ndb_key");
+ DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d",
+ fieldnr, field->field_name, field->type(),
+ pack_len));
+ DBUG_DUMP("key", (char*)field_ptr, pack_len);
+
+ switch (field->type()) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ default:
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+ DBUG_RETURN(3);
+}
+
+
+/*
+ Instruct NDB to set the value of one attribute
+*/
+
+int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
+ uint fieldnr)
+{
+ const byte* field_ptr= field->ptr;
+ uint32 pack_len= field->pack_length();
+ DBUG_ENTER("set_ndb_value");
+ DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
+ fieldnr, field->field_name, field->type(),
+ pack_len, field->is_null()?"Y":"N"));
+ DBUG_DUMP("value", (char*) field_ptr, pack_len);
+
+ if (field->is_null())
+ {
+ // Set value to NULL
+ DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
+ }
+
+ switch (field->type()) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ default:
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+ DBUG_RETURN(3);
+}
+
+
+/*
+ Instruct NDB to fetch one field
+ - data is read directly into buffer provided by field_ptr
+ if it's NULL, data is read into memory provided by NDBAPI
+*/
+
+int ha_ndbcluster::get_ndb_value(NdbOperation *op,
+ uint field_no, byte *field_ptr)
+{
+ DBUG_ENTER("get_ndb_value");
+ DBUG_PRINT("enter", ("field_no: %d", field_no));
+ m_value[field_no]= op->getValue(field_no, field_ptr);
+ DBUG_RETURN(m_value == NULL);
+}
+
+
+/*
+ Get metadata for this table from NDB
+
+ IMPLEMENTATION
+ - save the NdbDictionary::Table for easy access
+ - check that frm-file on disk is equal to frm-file
+ of table accessed in NDB
+ - build a list of the indexes for the table
+*/
+
+int ha_ndbcluster::get_metadata(const char *path)
+{
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *tab;
+ const void *data, *pack_data;
+ const char **key_name;
+ uint ndb_columns, mysql_columns, length, pack_length, i;
+ int error;
+ DBUG_ENTER("get_metadata");
+ DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
+
+ if (!(tab= dict->getTable(m_tabname)))
+ ERR_RETURN(dict->getNdbError());
+ DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
+
+ /*
+ This is the place to check that the table we got from NDB
+ is equal to the one on local disk
+ */
+ ndb_columns= (uint) tab->getNoOfColumns();
+ mysql_columns= table->fields;
+ if (table->primary_key == MAX_KEY)
+ ndb_columns--;
+ if (ndb_columns != mysql_columns)
+ {
+ DBUG_PRINT("error",
+ ("Wrong number of columns, ndb: %d mysql: %d",
+ ndb_columns, mysql_columns));
+ DBUG_RETURN(HA_ERR_OLD_METADATA);
+ }
+
+ /*
+ Compare FrmData in NDB with frm file from disk.
+ */
+ error= 0;
+ if (readfrm(path, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_RETURN(1);
+ }
+
+ if ((pack_length != tab->getFrmLength()) ||
+ (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ {
+ DBUG_PRINT("error",
+ ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+ pack_length, tab->getFrmLength(),
+ memcmp(pack_data, tab->getFrmData(), pack_length)));
+ DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
+ DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
+ error= HA_ERR_OLD_METADATA;
+ }
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+ if (error)
+ DBUG_RETURN(error);
+
+ // All checks OK, lets use the table
+ m_table= (void*)tab;
+
+ for (i= 0; i < MAX_KEY; i++)
+ m_indextype[i]= UNDEFINED_INDEX;
+
+ // Save information about all known indexes
+ for (i= 0; i < table->keys; i++)
+ m_indextype[i] = get_index_type_from_table(i);
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Decode the type of an index from information
+ provided in table object
+*/
+NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const
+{
+ if (index_no == table->primary_key)
+ return PRIMARY_KEY_INDEX;
+ else
+ return ((table->key_info[index_no].flags & HA_NOSAME) ?
+ UNIQUE_INDEX :
+ ORDERED_INDEX);
+}
+
+
+void ha_ndbcluster::release_metadata()
+{
+ DBUG_ENTER("release_metadata");
+ DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
+
+ m_table= NULL;
+
+ DBUG_VOID_RETURN;
+}
+
+static const ulong index_type_flags[]=
+{
+ /* UNDEFINED_INDEX */
+ 0,
+
+ /* PRIMARY_KEY_INDEX */
+ HA_ONLY_WHOLE_INDEX |
+ HA_WRONG_ASCII_ORDER |
+ HA_NOT_READ_PREFIX_LAST,
+
+ /* UNIQUE_INDEX */
+ HA_ONLY_WHOLE_INDEX |
+ HA_WRONG_ASCII_ORDER |
+ HA_NOT_READ_PREFIX_LAST,
+
+ /* ORDERED_INDEX */
+ HA_READ_NEXT |
+ HA_READ_PREV |
+ HA_NOT_READ_AFTER_KEY
+};
+
+static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
+
+inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
+{
+ return table->keynames.type_names[idx_no];
+}
+
+inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
+{
+ DBUG_ASSERT(idx_no < MAX_KEY);
+ return m_indextype[idx_no];
+}
+
+
+/*
+ Get the flags for an index
+
+ RETURN
+ flags depending on the type of the index.
+*/
+
+inline ulong ha_ndbcluster::index_flags(uint idx_no) const
+{
+ DBUG_ENTER("index_flags");
+ DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
+ DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
+}
+
+
+int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
+{
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+ DBUG_ENTER("set_primary_key");
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, key))
+ ERR_RETURN(op->getNdbError());
+ key += key_part->length;
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::set_primary_key(NdbOperation *op)
+{
+ DBUG_ENTER("set_primary_key");
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Read one record from NDB using primary key
+*/
+
+int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
+{
+ uint no_fields= table->fields, i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ THD *thd= current_thd;
+ DBUG_ENTER("pk_read");
+ DBUG_PRINT("enter", ("key_len: %u", key_len));
+ DBUG_DUMP("key", (char*)key, key_len);
+
+ if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
+ goto err;
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ DBUG_DUMP("key", (char*)key, 8);
+ if (set_hidden_key(op, no_fields, key))
+ goto err;
+ // Read key at the same time, for future reference
+ if (get_ndb_value(op, no_fields, NULL))
+ goto err;
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op, key)))
+ return res;
+ }
+
+ // Read non-key field(s)
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if (thd->query_id == field->query_id)
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ goto err;
+ }
+ else
+ {
+ // Attribute was not to be read
+ m_value[i]= NULL;
+ }
+ }
+
+ if (trans->execute(NoCommit, IgnoreError) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
+
+ // The value have now been fetched from NDB
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+
+ err:
+ ERR_RETURN(trans->getNdbError());
+}
+
+
+/*
+ Read one record from NDB using unique secondary index
+*/
+
+int ha_ndbcluster::unique_index_read(const byte *key,
+ uint key_len, byte *buf)
+{
+ NdbConnection *trans= m_active_trans;
+ const char *index_name;
+ NdbIndexOperation *op;
+ THD *thd= current_thd;
+ byte *key_ptr;
+ KEY* key_info;
+ KEY_PART_INFO *key_part, *end;
+ uint i;
+ DBUG_ENTER("unique_index_read");
+ DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
+ DBUG_DUMP("key", (char*)key, key_len);
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) ||
+ op->readTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ // Set secondary index key(s)
+ key_ptr= (byte *) key;
+ key_info= table->key_info + active_index;
+ DBUG_ASSERT(key_info->key_length == key_len);
+ end= (key_part= key_info->key_part) + key_info->key_parts;
+
+ for (i= 0; key_part != end; key_part++, i++)
+ {
+ if (set_ndb_key(op, key_part->field, i, key_ptr))
+ ERR_RETURN(trans->getNdbError());
+ key_ptr+= key_part->length;
+ }
+
+ // Get non-index attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ // Attribute was not to be read
+ m_value[i]= NULL;
+ }
+ }
+
+ if (trans->execute(NoCommit, IgnoreError) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
+ // The value have now been fetched from NDB
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+}
+
+/*
+ Get the next record of a started scan
+*/
+
+inline int ha_ndbcluster::next_result(byte *buf)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("next_result");
+
+ if (cursor->nextResult() == 0)
+ {
+ // One more record found
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+ }
+ table->status= STATUS_NOT_FOUND;
+ if (ndb_err(trans))
+ ERR_RETURN(trans->getNdbError());
+
+ // No more records
+ DBUG_PRINT("info", ("No more records"));
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+}
+
+
+/*
+ Read record(s) from NDB using ordered index scan
+*/
+
+int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag)
+{
+ uint no_fields= table->fields;
+ uint tot_len, i;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+ NdbScanOperation *op;
+ const char *bound_str= NULL;
+ const char *index_name;
+ NdbOperation::BoundType bound_type = NdbOperation::BoundEQ;
+ bool can_be_handled_by_ndb= FALSE;
+ byte *key_ptr;
+ KEY *key_info;
+ THD* thd = current_thd;
+ DBUG_ENTER("ordered_index_scan");
+ DBUG_PRINT("enter", ("index: %u", active_index));
+ DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ switch (find_flag) {
+ case HA_READ_KEY_EXACT: /* Find first record else error */
+ bound_str= "HA_READ_KEY_EXACT";
+ bound_type= NdbOperation::BoundEQ;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_KEY_OR_NEXT: /* Record or next record */
+ bound_str= "HA_READ_KEY_OR_NEXT";
+ bound_type= NdbOperation::BoundLE;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_KEY_OR_PREV: /* Record or previous */
+ bound_str= "HA_READ_KEY_OR_PREV";
+ bound_type= NdbOperation::BoundGE;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
+ bound_str= "HA_READ_AFTER_KEY";
+ bound_type= NdbOperation::BoundLT;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
+ bound_str= "HA_READ_BEFORE_KEY";
+ bound_type= NdbOperation::BoundGT;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_PREFIX: /* Key which as same prefix */
+ bound_str= "HA_READ_PREFIX";
+ break;
+ case HA_READ_PREFIX_LAST: /* Last key with the same prefix */
+ bound_str= "HA_READ_PREFIX_LAST";
+ break;
+ case HA_READ_PREFIX_LAST_OR_PREV:
+ /* Last or prev key with the same prefix */
+ bound_str= "HA_READ_PREFIX_LAST_OR_PREV";
+ break;
+ default:
+ bound_str= "UNKNOWN";
+ break;
+ }
+ DBUG_PRINT("info", ("find_flag: %s, bound_type: %d,"
+ "can_be_handled_by_ndb: %d",
+ bound_str, bound_type, can_be_handled_by_ndb));
+ if (!can_be_handled_by_ndb)
+ DBUG_RETURN(1);
+
+ // Set bounds using key data
+ tot_len= 0;
+ key_ptr= (byte *) key;
+ key_info= table->key_info + active_index;
+ for (i= 0; i < key_info->key_parts; i++)
+ {
+ Field* field= key_info->key_part[i].field;
+ uint32 field_len= field->pack_length();
+ DBUG_PRINT("info", ("Set index bound on %s",
+ field->field_name));
+ DBUG_DUMP("key", (char*)key_ptr, field_len);
+
+ if (op->setBound(field->field_name,
+ bound_type,
+ key_ptr,
+ field_len) != 0)
+ ERR_RETURN(op->getNdbError());
+
+ key_ptr+= field_len;
+ tot_len+= field_len;
+ if (tot_len >= key_len)
+ break;
+ }
+
+ // Define attributes to read
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i]= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= no_fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+
+#if 0
+/*
+ Read record(s) from NDB using full table scan with filter
+ */
+
+int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag)
+{
+ uint no_fields= table->fields;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+
+ DBUG_ENTER("filtered_scan");
+ DBUG_PRINT("enter", ("key_len: %u, index: %u",
+ key_len, active_index));
+ DBUG_DUMP("key", (char*)key, key_len);
+ DBUG_PRINT("info", ("Starting a new filtered scan on %s",
+ m_tabname));
+ NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
+ if (!op)
+ ERR_RETURN(trans->getNdbError());
+
+ cursor= op->readTuples(parallelism);
+ if (!cursor)
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ {
+ // Start scan filter
+ NdbScanFilter sf(op);
+ sf.begin();
+
+ // Set filter using the supplied key data
+ byte *key_ptr= (byte *) key;
+ uint tot_len= 0;
+ KEY* key_info= table->key_info + active_index;
+ for (uint k= 0; k < key_info->key_parts; k++)
+ {
+ KEY_PART_INFO* key_part= key_info->key_part+k;
+ Field* field= key_part->field;
+ uint ndb_fieldnr= key_part->fieldnr-1;
+ DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
+ // const NDBCOL *col= tab->getColumn(ndb_fieldnr);
+ uint32 field_len= field->pack_length();
+ DBUG_DUMP("key", (char*)key, field_len);
+
+ DBUG_PRINT("info", ("Column %s, type: %d, len: %d",
+ field->field_name, field->real_type(), field_len));
+
+ // Define scan filter
+ if (field->real_type() == MYSQL_TYPE_STRING)
+ sf.eq(ndb_fieldnr, key_ptr, field_len);
+ else
+ {
+ if (field_len == 8)
+ sf.eq(ndb_fieldnr, (Uint64)*key_ptr);
+ else if (field_len <= 4)
+ sf.eq(ndb_fieldnr, (Uint32)*key_ptr);
+ else
+ DBUG_RETURN(1);
+ }
+
+ key_ptr += field_len;
+ tot_len += field_len;
+
+ if (tot_len >= key_len)
+ break;
+ }
+ // End scan filter
+ sf.end();
+ }
+
+ // Define attributes to read
+ for (uint field_no= 0; field_no < no_fields; field_no++)
+ {
+ Field *field= table->field[field_no];
+
+ // Read attribute
+ DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
+ if (get_ndb_value(op, field_no, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= no_fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+#endif
+
+
+/*
+ Read records from NDB using full table scan
+ */
+
+int ha_ndbcluster::full_table_scan(byte *buf)
+{
+ uint i;
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor;
+ NdbScanOperation *op;
+
+ DBUG_ENTER("full_table_scan");
+ DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
+
+ if (!(op=trans->getNdbScanOperation(m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ // Define attributes to read
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i]= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= table->fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+
+/*
+ Insert one record into NDB
+*/
+
+int ha_ndbcluster::write_row(byte *record)
+{
+ uint i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ int res;
+ DBUG_ENTER("write_row");
+
+ statistic_increment(ha_write_count,&LOCK_status);
+ if (table->timestamp_default_now)
+ update_timestamp(record+table->timestamp_default_now-1);
+ if (table->next_number_field && record == table->record[0])
+ update_auto_increment();
+
+ if (!(op= trans->getNdbOperation(m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+
+ res= (m_use_write) ? op->writeTuple() :op->insertTuple();
+ if (res != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // Table has hidden primary key
+ Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
+
+ // Set non-key attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if (!(field->flags & PRI_KEY_FLAG) &&
+ set_ndb_value(op, field, i))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ /*
+ Execute write operation
+ NOTE When doing inserts with many values in
+ each INSERT statement it should not be necessary
+ to NoCommit the transaction between each row.
+ Find out how this is detected!
+ */
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_RETURN(0);
+}
+
+
+/* Compare if a key in a row has changed */
+
+int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
+ const byte * new_row)
+{
+ KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
+ KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;
+
+ for (; key_part != end ; key_part++)
+ {
+ if (key_part->null_bit)
+ {
+ if ((old_row[key_part->null_offset] & key_part->null_bit) !=
+ (new_row[key_part->null_offset] & key_part->null_bit))
+ return 1;
+ }
+ if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH))
+ {
+
+ if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
+ (char*) (new_row + key_part->offset),
+ (ulong) key_part->length))
+ return 1;
+ }
+ else
+ {
+ if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
+ key_part->length))
+ return 1;
+ }
+ }
+ return 0;
+}
+
+/*
+ Update one record in NDB using primary key
+*/
+
+int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
+{
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ uint i;
+ DBUG_ENTER("update_row");
+
+ statistic_increment(ha_update_count,&LOCK_status);
+ if (table->timestamp_on_update_now)
+ update_timestamp(new_data+table->timestamp_on_update_now-1);
+
+ if (!(op= trans->getNdbOperation(m_tabname)) ||
+ op->updateTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+
+ // Require that the PK for this record has previously been
+ // read into m_value
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec);
+ DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ /* Check for update of primary key and return error */
+ if (key_cmp(table->primary_key, old_data, new_data))
+ DBUG_RETURN(HA_ERR_UNSUPPORTED);
+
+ int res;
+ if ((res= set_primary_key(op, old_data + table->null_bytes)))
+ DBUG_RETURN(res);
+ }
+
+ // Set non-key attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) &&
+ (!(field->flags & PRI_KEY_FLAG)) &&
+ set_ndb_value(op, field, i))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ // Execute update operation
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Delete one record from NDB, using primary key
+*/
+
+int ha_ndbcluster::delete_row(const byte *record)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ DBUG_ENTER("delete_row");
+
+ statistic_increment(ha_delete_count,&LOCK_status);
+
+ if (!(op=trans->getNdbOperation(m_tabname)) ||
+ op->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec != NULL);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
+
+ // Execute delete operation
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_RETURN(0);
+}
+
+/*
+ Unpack a record read from NDB
+
+ SYNOPSIS
+ unpack_record()
+ buf Buffer to store read row
+
+ NOTE
+ The data for each row is read directly into the
+ destination buffer. This function is primarily
+ called in order to check if any fields should be
+ set to null.
+*/
+
+void ha_ndbcluster::unpack_record(byte* buf)
+{
+ uint row_offset= (uint) (buf - table->record[0]);
+ Field **field, **end;
+ NdbRecAttr **value= m_value;
+ DBUG_ENTER("unpack_record");
+
+ // Set null flag(s)
+ bzero(buf, table->null_bytes);
+ for (field= table->field, end= field+table->fields;
+ field < end;
+ field++, value++)
+ {
+ if (*value && (*value)->isNULL())
+ (*field)->set_null(row_offset);
+ }
+
+#ifndef DBUG_OFF
+ // Read and print all values that was fetched
+ if (table->primary_key == MAX_KEY)
+ {
+ // Table with hidden primary key
+ int hidden_no= table->fields;
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBCOL *hidden_col= tab->getColumn(hidden_no);
+ NdbRecAttr* rec= m_value[hidden_no];
+ DBUG_ASSERT(rec);
+ DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
+ hidden_col->getName(), rec->u_64_value()));
+ }
+ print_results();
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Utility function to print/dump the fetched field
+ */
+
+void ha_ndbcluster::print_results()
+{
+ const NDBTAB *tab= (NDBTAB*) m_table;
+ DBUG_ENTER("print_results");
+
+#ifndef DBUG_OFF
+ if (!_db_on_)
+ DBUG_VOID_RETURN;
+
+ for (uint f=0; f<table->fields;f++)
+ {
+ Field *field;
+ const NDBCOL *col;
+ NdbRecAttr *value;
+
+ if (!(value= m_value[f]))
+ {
+ fprintf(DBUG_FILE, "Field %d was not read\n", f);
+ continue;
+ }
+ field= table->field[f];
+ DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
+ col= tab->getColumn(f);
+ fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
+
+ if (value->isNULL())
+ {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
+
+ switch (col->getType()) {
+ case NdbDictionary::Column::Blob:
+ case NdbDictionary::Column::Undefined:
+ fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
+ break;
+ case NdbDictionary::Column::Tinyint: {
+ char value= *field->ptr;
+ fprintf(DBUG_FILE, "Tinyint\t%d", value);
+ break;
+ }
+ case NdbDictionary::Column::Tinyunsigned: {
+ unsigned char value= *field->ptr;
+ fprintf(DBUG_FILE, "Tinyunsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Smallint: {
+ short value= *field->ptr;
+ fprintf(DBUG_FILE, "Smallint\t%d", value);
+ break;
+ }
+ case NdbDictionary::Column::Smallunsigned: {
+ unsigned short value= *field->ptr;
+ fprintf(DBUG_FILE, "Smallunsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Mediumint: {
+ byte value[3];
+ memcpy(value, field->ptr, 3);
+ fprintf(DBUG_FILE, "Mediumint\t%d,%d,%d", value[0], value[1], value[2]);
+ break;
+ }
+ case NdbDictionary::Column::Mediumunsigned: {
+ byte value[3];
+ memcpy(value, field->ptr, 3);
+ fprintf(DBUG_FILE, "Mediumunsigned\t%u,%u,%u", value[0], value[1], value[2]);
+ break;
+ }
+ case NdbDictionary::Column::Int: {
+ fprintf(DBUG_FILE, "Int\t%lld", field->val_int());
+ break;
+ }
+ case NdbDictionary::Column::Unsigned: {
+ Uint32 value= (Uint32) *field->ptr;
+ fprintf(DBUG_FILE, "Unsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Bigint: {
+ Int64 value= (Int64) *field->ptr;
+ fprintf(DBUG_FILE, "Bigint\t%lld", value);
+ break;
+ }
+ case NdbDictionary::Column::Bigunsigned: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Bigunsigned\t%llu", value);
+ break;
+ }
+ case NdbDictionary::Column::Float: {
+ float value= (float) *field->ptr;
+ fprintf(DBUG_FILE, "Float\t%f", value);
+ break;
+ }
+ case NdbDictionary::Column::Double: {
+ double value= (double) *field->ptr;
+ fprintf(DBUG_FILE, "Double\t%f", value);
+ break;
+ }
+ case NdbDictionary::Column::Decimal: {
+ char *value= field->ptr;
+
+ fprintf(DBUG_FILE, "Decimal\t'%-*s'", field->pack_length(), value);
+ break;
+ }
+ case NdbDictionary::Column::Char:{
+ char buf[field->pack_length()+1];
+ char *value= (char *) field->ptr;
+ snprintf(buf, field->pack_length(), "%s", value);
+ fprintf(DBUG_FILE, "Char\t'%s'", buf);
+ break;
+ }
+ case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Binary:
+ case NdbDictionary::Column::Varbinary: {
+ char *value= (char *) field->ptr;
+ fprintf(DBUG_FILE, "'%s'", value);
+ break;
+ }
+ case NdbDictionary::Column::Datetime: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Datetime\t%llu", value);
+ break;
+ }
+ case NdbDictionary::Column::Timespec: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Timespec\t%llu", value);
+ break;
+ }
+ }
+ fprintf(DBUG_FILE, "\n");
+
+ }
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+int ha_ndbcluster::index_init(uint index)
+{
+ DBUG_ENTER("index_init");
+ DBUG_PRINT("enter", ("index: %u", index));
+ DBUG_RETURN(handler::index_init(index));
+}
+
+
+int ha_ndbcluster::index_end()
+{
+ DBUG_ENTER("index_end");
+ DBUG_RETURN(rnd_end());
+}
+
+
+int ha_ndbcluster::index_read(byte *buf,
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ DBUG_ENTER("index_read");
+ DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
+ active_index, key_len, find_flag));
+
+ int error= 1;
+ statistic_increment(ha_read_key_count, &LOCK_status);
+
+ switch (get_index_type(active_index)){
+ case PRIMARY_KEY_INDEX:
+ error= pk_read(key, key_len, buf);
+ break;
+
+ case UNIQUE_INDEX:
+ error= unique_index_read(key, key_len, buf);
+ break;
+
+ case ORDERED_INDEX:
+ error= ordered_index_scan(key, key_len, buf, find_flag);
+ break;
+
+ default:
+ case UNDEFINED_INDEX:
+ break;
+ }
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ statistic_increment(ha_read_key_count,&LOCK_status);
+ DBUG_ENTER("index_read_idx");
+ DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));
+ index_init(index_no);
+ DBUG_RETURN(index_read(buf, key, key_len, find_flag));
+}
+
+
+int ha_ndbcluster::index_next(byte *buf)
+{
+ DBUG_ENTER("index_next");
+
+ int error = 1;
+ statistic_increment(ha_read_next_count,&LOCK_status);
+ if (get_index_type(active_index) == PRIMARY_KEY_INDEX)
+ error= HA_ERR_END_OF_FILE;
+ else
+ error = next_result(buf);
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::index_prev(byte *buf)
+{
+ DBUG_ENTER("index_prev");
+ statistic_increment(ha_read_prev_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::index_first(byte *buf)
+{
+ DBUG_ENTER("index_first");
+ statistic_increment(ha_read_first_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::index_last(byte *buf)
+{
+ DBUG_ENTER("index_last");
+ statistic_increment(ha_read_last_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::rnd_init(bool scan)
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("rnd_init");
+ DBUG_PRINT("enter", ("scan: %d", scan));
+ // Check that cursor is not defined
+ if (cursor)
+ DBUG_RETURN(1);
+ index_init(table->primary_key);
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::rnd_end()
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("rnd_end");
+
+ if (cursor)
+ {
+ DBUG_PRINT("info", ("Closing the cursor"));
+ cursor->close();
+ m_active_cursor= NULL;
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::rnd_next(byte *buf)
+{
+ DBUG_ENTER("rnd_next");
+ statistic_increment(ha_read_rnd_next_count, &LOCK_status);
+ int error = 1;
+ if (!m_active_cursor)
+ error = full_table_scan(buf);
+ else
+ error = next_result(buf);
+ DBUG_RETURN(error);
+}
+
+
+/*
+ An "interesting" record has been found and it's pk
+ retrieved by calling position
+ Now it's time to read the record from db once
+ again
+*/
+
+int ha_ndbcluster::rnd_pos(byte *buf, byte *pos)
+{
+ DBUG_ENTER("rnd_pos");
+ statistic_increment(ha_read_rnd_count,&LOCK_status);
+ // The primary key for the record is stored in pos
+ // Perform a pk_read using primary key "index"
+ DBUG_RETURN(pk_read(pos, ref_length, buf));
+}
+
+
+/*
+ Store the primary key of this record in ref
+ variable, so that the row can be retrieved again later
+ using "reference" in rnd_pos
+*/
+
+void ha_ndbcluster::position(const byte *record)
+{
+ KEY *key_info;
+ KEY_PART_INFO *key_part;
+ KEY_PART_INFO *end;
+ byte *buff;
+ DBUG_ENTER("position");
+
+ if (table->primary_key != MAX_KEY)
+ {
+ key_info= table->key_info + table->primary_key;
+ key_part= key_info->key_part;
+ end= key_part + key_info->key_parts;
+ buff= ref;
+
+ for (; key_part != end; key_part++)
+ {
+ if (key_part->null_bit) {
+ /* Store 0 if the key part is a NULL part */
+ if (record[key_part->null_offset]
+ & key_part->null_bit) {
+ *buff++= 1;
+ continue;
+ }
+ *buff++= 0;
+ }
+ memcpy(buff, record + key_part->offset, key_part->length);
+ buff += key_part->length;
+ }
+ }
+ else
+ {
+ // No primary key, get hidden key
+ DBUG_PRINT("info", ("Getting hidden key"));
+ int hidden_no= table->fields;
+ NdbRecAttr* rec= m_value[hidden_no];
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBCOL *hidden_col= tab->getColumn(hidden_no);
+ DBUG_ASSERT(hidden_col->getPrimaryKey() &&
+ hidden_col->getAutoIncrement() &&
+ rec != NULL &&
+ ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+ memcpy(ref, (const void*)rec->aRef(), ref_length);
+ }
+
+ DBUG_DUMP("ref", (char*)ref, ref_length);
+ DBUG_VOID_RETURN;
+}
+
+
+void ha_ndbcluster::info(uint flag)
+{
+ DBUG_ENTER("info");
+ DBUG_PRINT("enter", ("flag: %d", flag));
+
+ if (flag & HA_STATUS_POS)
+ DBUG_PRINT("info", ("HA_STATUS_POS"));
+ if (flag & HA_STATUS_NO_LOCK)
+ DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
+ if (flag & HA_STATUS_TIME)
+ DBUG_PRINT("info", ("HA_STATUS_TIME"));
+ if (flag & HA_STATUS_CONST)
+ DBUG_PRINT("info", ("HA_STATUS_CONST"));
+ if (flag & HA_STATUS_VARIABLE)
+ DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
+ if (flag & HA_STATUS_ERRKEY)
+ DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
+ if (flag & HA_STATUS_AUTO)
+ DBUG_PRINT("info", ("HA_STATUS_AUTO"));
+ DBUG_VOID_RETURN;
+}
+
+
+int ha_ndbcluster::extra(enum ha_extra_function operation)
+{
+ DBUG_ENTER("extra");
+ switch (operation) {
+ case HA_EXTRA_NORMAL: /* Optimize for space (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NORMAL"));
+ break;
+ case HA_EXTRA_QUICK: /* Optimize for speed */
+ DBUG_PRINT("info", ("HA_EXTRA_QUICK"));
+ break;
+ case HA_EXTRA_RESET: /* Reset database to after open */
+ DBUG_PRINT("info", ("HA_EXTRA_RESET"));
+ break;
+ case HA_EXTRA_CACHE: /* Cash record in HA_rrnd() */
+ DBUG_PRINT("info", ("HA_EXTRA_CACHE"));
+ break;
+ case HA_EXTRA_NO_CACHE: /* End cacheing of records (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE"));
+ break;
+ case HA_EXTRA_NO_READCHECK: /* No readcheck on update */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK"));
+ break;
+ case HA_EXTRA_READCHECK: /* Use readcheck (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_READCHECK"));
+ break;
+ case HA_EXTRA_KEYREAD: /* Read only key to database */
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD"));
+ break;
+ case HA_EXTRA_NO_KEYREAD: /* Normal read of records (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD"));
+ break;
+ case HA_EXTRA_NO_USER_CHANGE: /* No user is allowed to write */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE"));
+ break;
+ case HA_EXTRA_KEY_CACHE:
+ DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE"));
+ break;
+ case HA_EXTRA_NO_KEY_CACHE:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE"));
+ break;
+ case HA_EXTRA_WAIT_LOCK: /* Wait until file is avalably (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK"));
+ break;
+ case HA_EXTRA_NO_WAIT_LOCK: /* If file is locked, return quickly */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK"));
+ break;
+ case HA_EXTRA_WRITE_CACHE: /* Use write cache in ha_write() */
+ DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE"));
+ break;
+ case HA_EXTRA_FLUSH_CACHE: /* flush write_record_cache */
+ DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE"));
+ break;
+ case HA_EXTRA_NO_KEYS: /* Remove all update of keys */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS"));
+ break;
+ case HA_EXTRA_KEYREAD_CHANGE_POS: /* Keyread, but change pos */
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */
+ break;
+ case HA_EXTRA_REMEMBER_POS: /* Remember pos for next/prev */
+ DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS"));
+ break;
+ case HA_EXTRA_RESTORE_POS:
+ DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS"));
+ break;
+ case HA_EXTRA_REINIT_CACHE: /* init cache from current record */
+ DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE"));
+ break;
+ case HA_EXTRA_FORCE_REOPEN: /* Datafile have changed on disk */
+ DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN"));
+ break;
+ case HA_EXTRA_FLUSH: /* Flush tables to disk */
+ DBUG_PRINT("info", ("HA_EXTRA_FLUSH"));
+ break;
+ case HA_EXTRA_NO_ROWS: /* Don't write rows */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS"));
+ break;
+ case HA_EXTRA_RESET_STATE: /* Reset positions */
+ DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE"));
+ break;
+ case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
+ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
+
+ DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
+ m_use_write= TRUE;
+ break;
+ case HA_EXTRA_NO_IGNORE_DUP_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
+ DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
+ m_use_write= false;
+ break;
+ case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
+ where field->query_id is the same as
+ the current query id */
+ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
+ break;
+ case HA_EXTRA_PREPARE_FOR_DELETE:
+ DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
+ break;
+ case HA_EXTRA_PREPARE_FOR_UPDATE: /* Remove read cache if problems */
+ DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE"));
+ break;
+ case HA_EXTRA_PRELOAD_BUFFER_SIZE:
+ DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE"));
+ break;
+ case HA_EXTRA_RETRIEVE_PRIMARY_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
+ break;
+ case HA_EXTRA_CHANGE_KEY_TO_UNIQUE:
+ DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
+ break;
+ case HA_EXTRA_CHANGE_KEY_TO_DUP:
+ DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
+ break;
+
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
+{
+ DBUG_ENTER("extra_opt");
+ DBUG_PRINT("enter", ("cache_size: %d", cache_size));
+ DBUG_RETURN(extra(operation));
+}
+
+
+int ha_ndbcluster::reset()
+{
+ DBUG_ENTER("reset");
+ // Reset what?
+ DBUG_RETURN(1);
+}
+
+
+const char **ha_ndbcluster::bas_ext() const
+{ static const char *ext[1] = { NullS }; return ext; }
+
+
+/*
+ How many seeks it will take to read through the table
+ This is to be comparable to the number returned by records_in_range so
+ that we can decide if we should scan the table or use keys.
+*/
+
+double ha_ndbcluster::scan_time()
+{
+ return rows2double(records/3);
+}
+
+
+THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type)
+{
+ DBUG_ENTER("store_lock");
+
+ if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK)
+ {
+
+ /* If we are not doing a LOCK TABLE, then allow multiple
+ writers */
+
+ if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+ lock_type <= TL_WRITE) && !thd->in_lock_tables)
+ lock_type= TL_WRITE_ALLOW_WRITE;
+
+ /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
+ MySQL would use the lock TL_READ_NO_INSERT on t2, and that
+ would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
+ to t2. Convert the lock to a normal read lock to allow
+ concurrent inserts to t2. */
+
+ if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
+ lock_type= TL_READ;
+
+ m_lock.type=lock_type;
+ }
+ *to++= &m_lock;
+
+ DBUG_RETURN(to);
+}
+
+#ifndef DBUG_OFF
+#define PRINT_OPTION_FLAGS(t) { \
+ if (t->options & OPTION_NOT_AUTOCOMMIT) \
+ DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \
+ if (t->options & OPTION_BEGIN) \
+ DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \
+ if (t->options & OPTION_TABLE_LOCK) \
+ DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \
+}
+#else
+#define PRINT_OPTION_FLAGS(t)
+#endif
+
+
+/*
+ As MySQL will execute an external lock for every new table it uses
+ we can use this to start the transactions.
+ If we are in auto_commit mode we just need to start a transaction
+ for the statement, this will be stored in transaction.stmt.
+ If not, we have to start a master transaction if there doesn't exist
+ one from before, this will be stored in transaction.all
+
+ When a table lock is held one transaction will be started which holds
+ the table lock and for each statement a hupp transaction will be started
+ */
+
+int ha_ndbcluster::external_lock(THD *thd, int lock_type)
+{
+ int error=0;
+ NdbConnection* trans= NULL;
+
+ DBUG_ENTER("external_lock");
+ DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
+ thd->transaction.ndb_lock_count));
+
+ /*
+ Check that this handler instance has a connection
+ set up to the Ndb object of thd
+ */
+ if (check_ndb_connection())
+ DBUG_RETURN(1);
+
+ if (lock_type != F_UNLCK)
+ {
+ if (!thd->transaction.ndb_lock_count++)
+ {
+ PRINT_OPTION_FLAGS(thd);
+
+ if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK)))
+ {
+ // Autocommit transaction
+ DBUG_ASSERT(!thd->transaction.stmt.ndb_tid);
+ DBUG_PRINT("trans",("Starting transaction stmt"));
+
+ trans= m_ndb->startTransaction();
+ if (trans == NULL)
+ {
+ thd->transaction.ndb_lock_count--; // We didn't get the lock
+ ERR_RETURN(m_ndb->getNdbError());
+ }
+ thd->transaction.stmt.ndb_tid= trans;
+ }
+ else
+ {
+ if (!thd->transaction.all.ndb_tid)
+ {
+ // Not autocommit transaction
+ // A "master" transaction ha not been started yet
+ DBUG_PRINT("trans",("starting transaction, all"));
+
+ trans= m_ndb->startTransaction();
+ if (trans == NULL)
+ {
+ thd->transaction.ndb_lock_count--; // We didn't get the lock
+ ERR_RETURN(m_ndb->getNdbError());
+ }
+
+ /*
+ If this is the start of a LOCK TABLE, a table look
+ should be taken on the table in NDB
+
+ Check if it should be read or write lock
+ */
+ if (thd->options & (OPTION_TABLE_LOCK))
+ {
+ //lockThisTable();
+ DBUG_PRINT("info", ("Locking the table..." ));
+ }
+
+ thd->transaction.all.ndb_tid= trans;
+ }
+ }
+ }
+ /*
+ This is the place to make sure this handler instance
+ has a started transaction.
+
+ The transaction is started by the first handler on which
+ MySQL Server calls external lock
+
+ Other handlers in the same stmt or transaction should use
+ the same NDB transaction. This is done by setting up the m_active_trans
+ pointer to point to the NDB transaction.
+ */
+
+ m_active_trans= thd->transaction.all.ndb_tid ?
+ (NdbConnection*)thd->transaction.all.ndb_tid:
+ (NdbConnection*)thd->transaction.stmt.ndb_tid;
+ DBUG_ASSERT(m_active_trans);
+
+ }
+ else
+ {
+ if (!--thd->transaction.ndb_lock_count)
+ {
+ DBUG_PRINT("trans", ("Last external_lock"));
+ PRINT_OPTION_FLAGS(thd);
+
+ if (thd->transaction.stmt.ndb_tid)
+ {
+ /*
+ Unlock is done without a transaction commit / rollback.
+ This happens if the thread didn't update any rows
+ We must in this case close the transaction to release resources
+ */
+ DBUG_PRINT("trans",("ending non-updating transaction"));
+ m_ndb->closeTransaction(m_active_trans);
+ thd->transaction.stmt.ndb_tid= 0;
+ }
+ }
+ m_active_trans= NULL;
+ }
+ DBUG_RETURN(error);
+}
+
+/*
+ When using LOCK TABLE's external_lock is only called when the actual
+ TABLE LOCK is done.
+ Under LOCK TABLES, each used tables will force a call to start_stmt.
+*/
+
+int ha_ndbcluster::start_stmt(THD *thd)
+{
+ int error=0;
+ DBUG_ENTER("start_stmt");
+ PRINT_OPTION_FLAGS(thd);
+
+ NdbConnection *trans= (NdbConnection*)thd->transaction.stmt.ndb_tid;
+ if (!trans){
+ DBUG_PRINT("trans",("Starting transaction stmt"));
+
+ NdbConnection *tablock_trans=
+ (NdbConnection*)thd->transaction.all.ndb_tid;
+ DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
+ DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
+ if (trans == NULL)
+ ERR_RETURN(m_ndb->getNdbError());
+ thd->transaction.stmt.ndb_tid= trans;
+ }
+ m_active_trans= trans;
+
+ DBUG_RETURN(error);
+}
+
+
+/*
+ Commit a transaction started in NDB
+ */
+
+int ndbcluster_commit(THD *thd, void *ndb_transaction)
+{
+ int res= 0;
+ Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ NdbConnection *trans= (NdbConnection*)ndb_transaction;
+
+ DBUG_ENTER("ndbcluster_commit");
+ DBUG_PRINT("transaction",("%s",
+ trans == thd->transaction.stmt.ndb_tid ?
+ "stmt" : "all"));
+ DBUG_ASSERT(ndb && trans);
+
+ if (trans->execute(Commit) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ ERR_PRINT(err);
+ res= ndb_to_mysql_error(&err);
+ }
+ ndb->closeTransaction(trans);
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Rollback a transaction started in NDB
+ */
+
+int ndbcluster_rollback(THD *thd, void *ndb_transaction)
+{
+ int res= 0;
+ Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ NdbConnection *trans= (NdbConnection*)ndb_transaction;
+
+ DBUG_ENTER("ndbcluster_rollback");
+ DBUG_PRINT("transaction",("%s",
+ trans == thd->transaction.stmt.ndb_tid ?
+ "stmt" : "all"));
+ DBUG_ASSERT(ndb && trans);
+
+ if (trans->execute(Rollback) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ ERR_PRINT(err);
+ res= ndb_to_mysql_error(&err);
+ }
+ ndb->closeTransaction(trans);
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Map MySQL type to the corresponding NDB type
+ */
+
+inline NdbDictionary::Column::Type
+mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
+{
+ switch(mysql_type) {
+ case MYSQL_TYPE_DECIMAL:
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_TINY:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Tinyunsigned :
+ NdbDictionary::Column::Tinyint;
+ case MYSQL_TYPE_SHORT:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Smallunsigned :
+ NdbDictionary::Column::Smallint;
+ case MYSQL_TYPE_LONG:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Unsigned :
+ NdbDictionary::Column::Int;
+ case MYSQL_TYPE_TIMESTAMP:
+ return NdbDictionary::Column::Unsigned;
+ case MYSQL_TYPE_LONGLONG:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Bigunsigned :
+ NdbDictionary::Column::Bigint;
+ case MYSQL_TYPE_INT24:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Mediumunsigned :
+ NdbDictionary::Column::Mediumint;
+ break;
+ case MYSQL_TYPE_FLOAT:
+ return NdbDictionary::Column::Float;
+ case MYSQL_TYPE_DOUBLE:
+ return NdbDictionary::Column::Double;
+ case MYSQL_TYPE_DATETIME :
+ return NdbDictionary::Column::Datetime;
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_YEAR :
+ // Missing NDB data types, mapped to char
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_ENUM :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_SET :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_TINY_BLOB :
+ case MYSQL_TYPE_MEDIUM_BLOB :
+ case MYSQL_TYPE_LONG_BLOB :
+ case MYSQL_TYPE_BLOB :
+ return NdbDictionary::Column::Blob;
+ case MYSQL_TYPE_VAR_STRING :
+ return NdbDictionary::Column::Varchar;
+ case MYSQL_TYPE_STRING :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_GEOMETRY :
+ return NdbDictionary::Column::Undefined;
+ }
+ return NdbDictionary::Column::Undefined;
+}
+
+
+/*
+ Create a table in NDB Cluster
+ */
+
+int ha_ndbcluster::create(const char *name,
+ TABLE *form,
+ HA_CREATE_INFO *info)
+{
+ NDBTAB tab;
+ NdbDictionary::Column::Type ndb_type;
+ NDBCOL col;
+ uint pack_length, length, i;
+ int res;
+ const void *data, *pack_data;
+ const char **key_name= form->keynames.type_names;
+ char name2[FN_HEADLEN];
+
+ DBUG_ENTER("create");
+ DBUG_PRINT("enter", ("name: %s", name));
+ fn_format(name2, name, "", "",2); // Remove the .frm extension
+ set_dbname(name2);
+ set_tabname(name2);
+
+ DBUG_PRINT("table", ("name: %s", m_tabname));
+ tab.setName(m_tabname);
+ tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
+
+ // Save frm data for this table
+ if (readfrm(name, &data, &length))
+ DBUG_RETURN(1);
+ if (packfrm(data, length, &pack_data, &pack_length))
+ DBUG_RETURN(2);
+
+ DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
+ tab.setFrm(pack_data, pack_length);
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+
+ for (i= 0; i < form->fields; i++)
+ {
+ Field *field= form->field[i];
+ ndb_type= mysql_to_ndb_type(field->real_type(),
+ field->flags & UNSIGNED_FLAG);
+ DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
+ field->field_name, field->real_type(),
+ field->pack_length()));
+ col.setName(field->field_name);
+ col.setType(ndb_type);
+ if ((ndb_type == NdbDictionary::Column::Char) ||
+ (ndb_type == NdbDictionary::Column::Varchar))
+ col.setLength(field->pack_length());
+ else
+ col.setLength(1);
+ col.setNullable(field->maybe_null());
+ col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
+ if (field->flags & AUTO_INCREMENT_FLAG)
+ {
+ DBUG_PRINT("info", ("Found auto_increment key"));
+ col.setAutoIncrement(TRUE);
+ ulonglong value = info->auto_increment_value ?
+ info->auto_increment_value -1 :
+ (ulonglong) 0;
+ DBUG_PRINT("info", ("initial value=%ld", value));
+// col.setInitialAutIncValue(value);
+ }
+ else
+ col.setAutoIncrement(false);
+
+ tab.addColumn(col);
+ }
+
+ // No primary key, create shadow key as 64 bit, auto increment
+ if (form->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Generating shadow key"));
+ col.setName("$PK");
+ col.setType(NdbDictionary::Column::Bigunsigned);
+ col.setLength(1);
+ col.setNullable(false);
+ col.setPrimaryKey(TRUE);
+ col.setAutoIncrement(TRUE);
+ tab.addColumn(col);
+ }
+
+ my_errno= 0;
+ if (check_ndb_connection())
+ {
+ my_errno= HA_ERR_NO_CONNECTION;
+ DBUG_RETURN(my_errno);
+ }
+
+ // Create the table in NDB
+ NDBDICT *dict= m_ndb->getDictionary();
+ if (dict->createTable(tab))
+ {
+ const NdbError err= dict->getNdbError();
+ ERR_PRINT(err);
+ my_errno= ndb_to_mysql_error(&err);
+ DBUG_RETURN(my_errno);
+ }
+ DBUG_PRINT("info", ("Table %s/%s created successfully",
+ m_dbname, m_tabname));
+
+ // Fetch table from NDB, check that it exists
+ const NDBTAB *tab2= dict->getTable(m_tabname);
+ if (tab2 == NULL)
+ {
+ const NdbError err= dict->getNdbError();
+ ERR_PRINT(err);
+ my_errno= ndb_to_mysql_error(&err);
+ DBUG_RETURN(my_errno);
+ }
+
+ // Create secondary indexes
+ for (i= 0; i < form->keys; i++)
+ {
+ DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i]));
+ if (i == form->primary_key)
+ {
+ DBUG_PRINT("info", ("Skipping it, PK already created"));
+ continue;
+ }
+
+ DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i]));
+ res= create_index(key_name[i],
+ form->key_info + i);
+ switch(res){
+ case 0:
+ // OK
+ break;
+ default:
+ DBUG_PRINT("error", ("Failed to create index %u", i));
+ drop_table();
+ my_errno= res;
+ goto err_end;
+ }
+ }
+
+err_end:
+ DBUG_RETURN(my_errno);
+}
+
+
+/*
+ Create an index in NDB Cluster
+ */
+
+int ha_ndbcluster::create_index(const char *name,
+ KEY *key_info){
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
+ KEY_PART_INFO *key_part= key_info->key_part;
+ KEY_PART_INFO *end= key_part + key_info->key_parts;
+
+ DBUG_ENTER("create_index");
+ DBUG_PRINT("enter", ("name: %s ", name));
+
+ // Check that an index with the same name do not already exist
+ if (dict->getIndex(name, m_tabname))
+ ERR_RETURN(dict->getNdbError());
+
+ NdbDictionary::Index ndb_index(name);
+ if (key_info->flags & HA_NOSAME)
+ ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
+ else
+ {
+ ndb_index.setType(NdbDictionary::Index::OrderedIndex);
+ // TODO Only temporary ordered indexes supported
+ ndb_index.setLogging(false);
+ }
+ ndb_index.setTable(m_tabname);
+
+ for (; key_part != end; key_part++)
+ {
+ Field *field= key_part->field;
+ DBUG_PRINT("info", ("attr: %s", field->field_name));
+ ndb_index.addColumnName(field->field_name);
+ }
+
+ if (dict->createIndex(ndb_index))
+ ERR_RETURN(dict->getNdbError());
+
+ // Success
+ DBUG_PRINT("info", ("Created index %s", name));
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Rename a table in NDB Cluster
+*/
+
+int ha_ndbcluster::rename_table(const char *from, const char *to)
+{
+ char new_tabname[FN_HEADLEN];
+
+ DBUG_ENTER("ha_ndbcluster::rename_table");
+ set_dbname(from);
+ set_tabname(from);
+ set_tabname(to, new_tabname);
+
+ if (check_ndb_connection()) {
+ my_errno= HA_ERR_NO_CONNECTION;
+ DBUG_RETURN(my_errno);
+ }
+
+ int result= alter_table_name(m_tabname, new_tabname);
+ if (result == 0)
+ set_tabname(to);
+
+ DBUG_RETURN(result);
+}
+
+
+/*
+ Rename a table in NDB Cluster using alter table
+ */
+
+int ha_ndbcluster::alter_table_name(const char *from, const char *to)
+{
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *orig_tab;
+ DBUG_ENTER("alter_table_name_table");
+ DBUG_PRINT("enter", ("Renaming %s to %s", from, to));
+
+ if (!(orig_tab= dict->getTable(from)))
+ ERR_RETURN(dict->getNdbError());
+
+ NdbDictionary::Table copy_tab= dict->getTableForAlteration(from);
+ copy_tab.setName(to);
+ if (dict->alterTable(copy_tab) != 0)
+ ERR_RETURN(dict->getNdbError());
+
+ m_table= NULL;
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Delete a table from NDB Cluster
+ */
+
+int ha_ndbcluster::delete_table(const char *name)
+{
+ DBUG_ENTER("delete_table");
+ DBUG_PRINT("enter", ("name: %s", name));
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ DBUG_RETURN(drop_table());
+}
+
+
+/*
+ Drop a table in NDB Cluster
+ */
+
+int ha_ndbcluster::drop_table()
+{
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
+
+ DBUG_ENTER("drop_table");
+ DBUG_PRINT("enter", ("Deleting %s", m_tabname));
+
+ if (dict->dropTable(m_tabname))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ ; // 709: No such table existed
+ else
+ ERR_RETURN(dict->getNdbError());
+ }
+ release_metadata();
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Drop a database in NDB Cluster
+ */
+
+int ndbcluster_drop_database(const char *path)
+{
+ DBUG_ENTER("ndbcluster_drop_database");
+ // TODO drop all tables for this database
+ DBUG_RETURN(1);
+}
+
+
+longlong ha_ndbcluster::get_auto_increment()
+{
+ // NOTE If number of values to be inserted is known
+ // the autoincrement cache could be used here
+ Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ return (longlong)auto_value;
+}
+
+
+/*
+ Constructor for the NDB Cluster table handler
+ */
+
+ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
+ handler(table_arg),
+ m_active_trans(NULL),
+ m_active_cursor(NULL),
+ m_ndb(NULL),
+ m_table(NULL),
+ m_table_flags(HA_REC_NOT_IN_SEQ |
+ HA_KEYPOS_TO_RNDPOS |
+ HA_NOT_EXACT_COUNT |
+ HA_NO_WRITE_DELAYED |
+ HA_NO_PREFIX_CHAR_KEYS |
+ HA_NO_BLOBS |
+ HA_DROP_BEFORE_CREATE |
+ HA_NOT_READ_AFTER_KEY),
+ m_use_write(false)
+{
+
+ DBUG_ENTER("ha_ndbcluster");
+
+ m_tabname[0]= '\0';
+ m_dbname[0]= '\0';
+
+ // TODO Adjust number of records and other parameters for proper
+ // selection of scan/pk access
+ records= 100;
+ block_size= 1024;
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Destructor for NDB Cluster table handler
+ */
+
+ha_ndbcluster::~ha_ndbcluster()
+{
+ DBUG_ENTER("~ha_ndbcluster");
+
+ release_metadata();
+
+ // Check for open cursor/transaction
+ DBUG_ASSERT(m_active_cursor == NULL);
+ DBUG_ASSERT(m_active_trans == NULL);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Open a table for further use
+ - fetch metadata for this table from NDB
+ - check that table exists
+*/
+
+int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
+{
+ KEY *key;
+ DBUG_ENTER("open");
+ DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
+ name, mode, test_if_locked));
+
+ // Setup ref_length to make room for the whole
+ // primary key to be written in the ref variable
+
+ if (table->primary_key != MAX_KEY)
+ {
+ key= table->key_info+table->primary_key;
+ ref_length= key->key_length;
+ DBUG_PRINT("info", (" ref_length: %d", ref_length));
+ }
+ // Init table lock structure
+ if (!(m_share=get_share(name)))
+ DBUG_RETURN(1);
+ thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
+
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ DBUG_RETURN(get_metadata(name));
+}
+
+
+/*
+ Close the table
+ - release resources setup by open()
+ */
+
+int ha_ndbcluster::close(void)
+{
+ DBUG_ENTER("close");
+ free_share(m_share);
+ release_metadata();
+ m_ndb= NULL;
+ DBUG_RETURN(0);
+}
+
+
+Ndb* ha_ndbcluster::seize_ndb()
+{
+ Ndb* ndb;
+ DBUG_ENTER("seize_ndb");
+
+#ifdef USE_NDB_POOL
+ // Seize from pool
+ ndb= Ndb::seize();
+#else
+ ndb= new Ndb("");
+#endif
+ if (ndb->init(NDB_MAX_TRANSACTIONS) != 0)
+ {
+ ERR_PRINT(ndb->getNdbError());
+ /*
+ TODO
+ Alt.1 If init fails because to many allocated Ndb
+ wait on condition for a Ndb object to be released.
+ Alt.2 Seize/release from pool, wait until next release
+ */
+ delete ndb;
+ ndb= NULL;
+ }
+ DBUG_RETURN(ndb);
+}
+
+
+void ha_ndbcluster::release_ndb(Ndb* ndb)
+{
+ DBUG_ENTER("release_ndb");
+#ifdef USE_NDB_POOL
+ // Release to pool
+ Ndb::release(ndb);
+#else
+ delete ndb;
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ If this thread already has a Ndb object allocated
+ in current THD, reuse it. Otherwise
+ seize a Ndb object, assign it to current THD and use it.
+
+ Having a Ndb object also means that a connection to
+ NDB cluster has been opened. The connection is
+ checked.
+
+*/
+
+int ha_ndbcluster::check_ndb_connection()
+{
+ THD* thd= current_thd;
+ Ndb* ndb;
+ DBUG_ENTER("check_ndb_connection");
+
+ if (!thd->transaction.ndb)
+ {
+ ndb= seize_ndb();
+ if (!ndb)
+ DBUG_RETURN(2);
+ thd->transaction.ndb= ndb;
+ }
+ m_ndb= (Ndb*)thd->transaction.ndb;
+ m_ndb->setDatabaseName(m_dbname);
+ if (m_ndb->waitUntilReady() != 0)
+ {
+ DBUG_PRINT("error", ("Ndb was not ready"));
+ DBUG_RETURN(3);
+ }
+ DBUG_RETURN(0);
+}
+
+void ndbcluster_close_connection(THD *thd)
+{
+ Ndb* ndb;
+ DBUG_ENTER("ndbcluster_close_connection");
+ ndb= (Ndb*)thd->transaction.ndb;
+ ha_ndbcluster::release_ndb(ndb);
+ thd->transaction.ndb= NULL;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Try to discover one table from NDB
+ */
+
+int ndbcluster_discover(const char *dbname, const char *name,
+ const void** frmblob, uint* frmlen)
+{
+ uint len;
+ const void* data;
+ const NDBTAB* tab;
+ DBUG_ENTER("ndbcluster_discover");
+ DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+
+ Ndb ndb(dbname);
+ if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0))
+ ERR_RETURN(ndb.getNdbError());
+
+ if (!(tab= ndb.getDictionary()->getTable(name)))
+ {
+ DBUG_PRINT("info", ("Table %s not found", name));
+ DBUG_RETURN(1);
+ }
+
+ DBUG_PRINT("info", ("Found table %s", tab->getName()));
+
+ len= tab->getFrmLength();
+ if (len == 0 || tab->getFrmData() == NULL)
+ {
+ DBUG_PRINT("No frm data found",
+ ("Table is probably created via NdbApi"));
+ DBUG_RETURN(2);
+ }
+
+ if (unpackfrm(&data, &len, tab->getFrmData()))
+ DBUG_RETURN(3);
+
+ *frmlen= len;
+ *frmblob= data;
+
+ DBUG_RETURN(0);
+}
+
+static Ndb* g_ndb= NULL;
+
+#ifdef USE_DISCOVER_ON_STARTUP
+/*
+ Dicover tables from NDB Cluster
+ - fetch a list of tables from NDB
+ - store the frm file for each table on disk
+ - if the table has an attached frm file
+ - if the database of the table exists
+*/
+
+int ndb_discover_tables()
+{
+ uint i;
+ NdbDictionary::Dictionary::List list;
+ NdbDictionary::Dictionary* dict;
+ char path[FN_REFLEN];
+ DBUG_ENTER("ndb_discover_tables");
+
+ /* List tables in NDB Cluster kernel */
+ dict= g_ndb->getDictionary();
+ if (dict->listObjects(list,
+ NdbDictionary::Object::UserTable) != 0)
+ ERR_RETURN(g_ndb->getNdbError());
+
+ for (i= 0 ; i < list.count ; i++)
+ {
+ NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+
+ DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
+ if (create_table_from_handler(t.database, t.name, true))
+ DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
+ }
+ DBUG_RETURN(0);
+}
+#endif
+
+
+/*
+ Initialise all gloal variables before creating
+ a NDB Cluster table handler
+ */
+
+bool ndbcluster_init()
+{
+ DBUG_ENTER("ndbcluster_init");
+ // Create a Ndb object to open the connection to NDB
+ g_ndb= new Ndb("sys");
+ if (g_ndb->init() != 0)
+ {
+ ERR_PRINT (g_ndb->getNdbError());
+ DBUG_RETURN(TRUE);
+ }
+ if (g_ndb->waitUntilReady() != 0)
+ {
+ ERR_PRINT (g_ndb->getNdbError());
+ DBUG_RETURN(TRUE);
+ }
+ (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
+ (hash_get_key) ndbcluster_get_key,0,0);
+ pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+ ndbcluster_inited= 1;
+#ifdef USE_DISCOVER_ON_STARTUP
+ if (ndb_discover_tables() != 0)
+ DBUG_RETURN(TRUE);
+#endif
+ DBUG_RETURN(false);
+}
+
+
+/*
+ End use of the NDB Cluster table handler
+ - free all global variables allocated by
+ ndcluster_init()
+*/
+
+bool ndbcluster_end()
+{
+ DBUG_ENTER("ndbcluster_end");
+ delete g_ndb;
+ g_ndb= NULL;
+ if (!ndbcluster_inited)
+ DBUG_RETURN(0);
+ hash_free(&ndbcluster_open_tables);
+#ifdef USE_NDB_POOL
+ ndb_pool_release();
+#endif
+ pthread_mutex_destroy(&ndbcluster_mutex);
+ ndbcluster_inited= 0;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Set m_tabname from full pathname to table file
+ */
+
+void ha_ndbcluster::set_tabname(const char *path_name)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ end= strend(path_name)-1;
+ ptr= end;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len= end - ptr;
+ memcpy(m_tabname, ptr + 1, end - ptr);
+ m_tabname[name_len]= '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+ ptr= m_tabname;
+
+ while (*ptr != '\0') {
+ *ptr = tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+/**
+ * Set a given location from full pathname to table file
+ *
+ */
+void
+ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ end = strend(path_name)-1;
+ ptr = end;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len = end - ptr;
+ memcpy(tabname, ptr + 1, end - ptr);
+ tabname[name_len] = '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+ ptr = tabname;
+
+ while (*ptr != '\0') {
+ *ptr= tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+
+/*
+ Set m_dbname from full pathname to table file
+
+ */
+
+void ha_ndbcluster::set_dbname(const char *path_name)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ ptr= strend(path_name)-1;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ ptr--;
+ end= ptr;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len= end - ptr;
+ memcpy(m_dbname, ptr + 1, name_len);
+ m_dbname[name_len]= '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+
+ ptr= m_dbname;
+
+ while (*ptr != '\0') {
+ *ptr= tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+
+ha_rows
+ha_ndbcluster::records_in_range(int inx,
+ const byte *start_key,uint start_key_len,
+ enum ha_rkey_function start_search_flag,
+ const byte *end_key,uint end_key_len,
+ enum ha_rkey_function end_search_flag)
+{
+ ha_rows records= 10;
+ KEY* key_info= table->key_info + inx;
+ uint key_length= key_info->key_length;
+
+ DBUG_ENTER("records_in_range");
+ DBUG_PRINT("enter", ("inx: %d", inx));
+ DBUG_PRINT("enter", ("start_key: %x, start_key_len: %d", start_key, start_key_len));
+ DBUG_PRINT("enter", ("start_search_flag: %d", start_search_flag));
+ DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len));
+ DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag));
+
+ /*
+ Check that start_key_len is equal to
+ the length of the used index and
+ prevent partial scan/read of hash indexes by returning HA_POS_ERROR
+ */
+ NDB_INDEX_TYPE idx_type= get_index_type(inx);
+ if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
+ start_key_len < key_length)
+ {
+ DBUG_PRINT("warning", ("Tried to use index which required"
+ "full key length: %d, HA_POS_ERROR",
+ key_length));
+ records= HA_POS_ERROR;
+ }
+ DBUG_RETURN(records);
+}
+
+
+/*
+ Handling the shared NDB_SHARE structure that is needed to
+ provide table locking.
+ It's also used for sharing data with other NDB handlers
+ in the same MySQL Server. There is currently not much
+ data we want to or can share.
+ */
+
+static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
+ my_bool not_used __attribute__((unused)))
+{
+ *length=share->table_name_length;
+ return (byte*) share->table_name;
+}
+
+static NDB_SHARE* get_share(const char *table_name)
+{
+ NDB_SHARE *share;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ uint length=(uint) strlen(table_name);
+ if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) table_name,
+ length)))
+ {
+ if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+ MYF(MY_WME | MY_ZEROFILL))))
+ {
+ share->table_name_length=length;
+ share->table_name=(char*) (share+1);
+ strmov(share->table_name,table_name);
+ if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+ {
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ my_free((gptr) share,0);
+ return 0;
+ }
+ thr_lock_init(&share->lock);
+ pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+ }
+ }
+ share->use_count++;
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return share;
+}
+
+
+static void free_share(NDB_SHARE *share)
+{
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (!--share->use_count)
+ {
+ hash_delete(&ndbcluster_open_tables, (byte*) share);
+ thr_lock_delete(&share->lock);
+ pthread_mutex_destroy(&share->mutex);
+ my_free((gptr) share, MYF(0));
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+}
+
+
+
+/*
+ Internal representation of the frm blob
+
+*/
+
+struct frm_blob_struct
+{
+ struct frm_blob_header
+ {
+ uint ver; // Version of header
+ uint orglen; // Original length of compressed data
+ uint complen; // Compressed length of data, 0=uncompressed
+ } head;
+ char data[1];
+};
+
+
+
+static int packfrm(const void *data, uint len,
+ const void **pack_data, uint *pack_len)
+{
+ int error;
+ ulong org_len, comp_len;
+ uint blob_len;
+ frm_blob_struct* blob;
+ DBUG_ENTER("packfrm");
+ DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
+
+ error= 1;
+ org_len = len;
+ if (my_compress((byte*)data, &org_len, &comp_len))
+ goto err;
+
+ DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
+ DBUG_DUMP("compressed", (char*)data, org_len);
+
+ error= 2;
+ blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
+ if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
+ goto err;
+
+ // Store compressed blob in machine independent format
+ int4store((char*)(&blob->head.ver), 1);
+ int4store((char*)(&blob->head.orglen), comp_len);
+ int4store((char*)(&blob->head.complen), org_len);
+
+ // Copy frm data into blob, already in machine independent format
+ memcpy(blob->data, data, org_len);
+
+ *pack_data = blob;
+ *pack_len = blob_len;
+ error = 0;
+
+ DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
+err:
+ DBUG_RETURN(error);
+
+}
+
+
+static int unpackfrm(const void **unpack_data, uint *unpack_len,
+ const void *pack_data)
+{
+ const frm_blob_struct *blob = (frm_blob_struct*)pack_data;
+ byte *data;
+ ulong complen, orglen, ver;
+ DBUG_ENTER("unpackfrm");
+ DBUG_PRINT("enter", ("pack_data: %x", pack_data));
+
+ complen= uint4korr((char*)&blob->head.complen);
+ orglen= uint4korr((char*)&blob->head.orglen);
+ ver= uint4korr((char*)&blob->head.ver);
+
+ DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
+ ver,complen,orglen));
+ DBUG_DUMP("blob->data", (char*) blob->data, complen);
+
+ if (ver != 1)
+ DBUG_RETURN(1);
+ if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME))))
+ DBUG_RETURN(2);
+ memcpy(data, blob->data, complen);
+
+ if (my_uncompress(data, &complen, &orglen))
+ {
+ my_free((char*)data, MYF(0));
+ DBUG_RETURN(3);
+ }
+
+ *unpack_data = data;
+ *unpack_len = complen;
+
+ DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
+
+ DBUG_RETURN(0);
+}
+#endif /* HAVE_NDBCLUSTER_DB */
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
new file mode 100644
index 00000000000..ed66d07d79b
--- /dev/null
+++ b/sql/ha_ndbcluster.h
@@ -0,0 +1,218 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/*
+ This file defines the NDB Cluster handler: the interface between MySQL and
+ NDB Cluster
+*/
+
+/* The class defining a handle to an NDB Cluster table */
+
+#ifdef __GNUC__
+#pragma interface /* gcc class implementation */
+#endif
+
+#include <ndbapi_limits.h>
+#include <ndb_types.h>
+
+class Ndb; // Forward declaration
+class NdbOperation; // Forward declaration
+class NdbConnection; // Forward declaration
+class NdbRecAttr; // Forward declaration
+class NdbResultSet; // Forward declaration
+
+typedef enum ndb_index_type {
+ UNDEFINED_INDEX = 0,
+ PRIMARY_KEY_INDEX = 1,
+ UNIQUE_INDEX = 2,
+ ORDERED_INDEX = 3
+} NDB_INDEX_TYPE;
+
+
+typedef struct st_ndbcluster_share {
+ THR_LOCK lock;
+ pthread_mutex_t mutex;
+ char *table_name;
+ uint table_name_length,use_count;
+} NDB_SHARE;
+
+class ha_ndbcluster: public handler
+{
+ public:
+ ha_ndbcluster(TABLE *table);
+ ~ha_ndbcluster();
+
+ int open(const char *name, int mode, uint test_if_locked);
+ int close(void);
+
+ int write_row(byte *buf);
+ int update_row(const byte *old_data, byte *new_data);
+ int delete_row(const byte *buf);
+ int index_init(uint index);
+ int index_end();
+ int index_read(byte *buf, const byte *key, uint key_len,
+ enum ha_rkey_function find_flag);
+ int index_read_idx(byte *buf, uint index, const byte *key, uint key_len,
+ enum ha_rkey_function find_flag);
+ int index_next(byte *buf);
+ int index_prev(byte *buf);
+ int index_first(byte *buf);
+ int index_last(byte *buf);
+ int rnd_init(bool scan=1);
+ int rnd_end();
+ int rnd_next(byte *buf);
+ int rnd_pos(byte *buf, byte *pos);
+ void position(const byte *record);
+
+ void info(uint);
+ int extra(enum ha_extra_function operation);
+ int extra_opt(enum ha_extra_function operation, ulong cache_size);
+ int reset();
+ int external_lock(THD *thd, int lock_type);
+ int start_stmt(THD *thd);
+ const char * table_type() const { return("ndbcluster");}
+ const char ** bas_ext() const;
+ ulong table_flags(void) const { return m_table_flags; }
+ ulong index_flags(uint idx) const;
+ uint max_record_length() const { return NDB_MAX_TUPLE_SIZE; };
+ uint max_keys() const { return MAX_KEY; }
+ uint max_key_parts() const { return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY; };
+ uint max_key_length() const { return NDB_MAX_KEY_SIZE;};
+
+ int rename_table(const char *from, const char *to);
+ int delete_table(const char *name);
+ int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
+ THR_LOCK_DATA **store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type);
+
+ bool low_byte_first() const
+ {
+#ifdef WORDS_BIGENDIAN
+ return false;
+#else
+ return true;
+#endif
+ }
+ bool has_transactions() { return true;}
+
+ const char* index_type(uint key_number) {
+ switch (get_index_type(key_number)) {
+ case ORDERED_INDEX:
+ return "BTREE";
+ case UNIQUE_INDEX:
+ case PRIMARY_KEY_INDEX:
+ default:
+ return "HASH";
+ }
+ }
+
+ double scan_time();
+ ha_rows records_in_range(int inx,
+ const byte *start_key,uint start_key_len,
+ enum ha_rkey_function start_search_flag,
+ const byte *end_key,uint end_key_len,
+ enum ha_rkey_function end_search_flag);
+
+
+ static Ndb* seize_ndb();
+ static void release_ndb(Ndb* ndb);
+
+
+ private:
+ int alter_table_name(const char *from, const char *to);
+ int drop_table();
+ int create_index(const char *name, KEY *key_info);
+ int initialize_autoincrement(const void* table);
+ int get_metadata(const char* path);
+ void release_metadata();
+ const char* get_index_name(uint idx_no) const;
+ NDB_INDEX_TYPE get_index_type(uint idx_no) const;
+ NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
+
+ int pk_read(const byte *key, uint key_len,
+ byte *buf);
+ int unique_index_read(const byte *key, uint key_len,
+ byte *buf);
+ int ordered_index_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag);
+ int full_table_scan(byte * buf);
+ int next_result(byte *buf);
+#if 0
+ int filtered_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag);
+#endif
+
+ void unpack_record(byte *buf);
+
+ void set_dbname(const char *pathname);
+ void set_tabname(const char *pathname);
+ void set_tabname(const char *pathname, char *tabname);
+
+ bool set_hidden_key(NdbOperation*,
+ uint fieldnr, const byte* field_ptr);
+ int set_ndb_key(NdbOperation*, Field *field,
+ uint fieldnr, const byte* field_ptr);
+ int set_ndb_value(NdbOperation*, Field *field, uint fieldnr);
+ int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
+ int set_primary_key(NdbOperation *op, const byte *key);
+ int set_primary_key(NdbOperation *op);
+ int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
+ void print_results();
+
+ longlong get_auto_increment();
+
+ int ndb_err(NdbConnection*);
+
+ private:
+ int check_ndb_connection();
+
+ NdbConnection *m_active_trans;
+ NdbResultSet *m_active_cursor;
+ Ndb *m_ndb;
+ void *m_table;
+ char m_dbname[FN_HEADLEN];
+ //char m_schemaname[FN_HEADLEN];
+ char m_tabname[FN_HEADLEN];
+ ulong m_table_flags;
+ THR_LOCK_DATA m_lock;
+ NDB_SHARE *m_share;
+ NDB_INDEX_TYPE m_indextype[MAX_KEY];
+ NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
+ bool m_use_write;
+};
+
+bool ndbcluster_init(void);
+bool ndbcluster_end(void);
+
+int ndbcluster_commit(THD *thd, void* ndb_transaction);
+int ndbcluster_rollback(THD *thd, void* ndb_transaction);
+
+void ndbcluster_close_connection(THD *thd);
+
+int ndbcluster_discover(const char* dbname, const char* name,
+ const void** frmblob, uint* frmlen);
+int ndbcluster_drop_database(const char* path);
+
+
+
+
+
+
+
+
diff --git a/sql/handler.cc b/sql/handler.cc
index 97abc11abe3..670a2b401be 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -40,6 +40,9 @@
#else
#define innobase_query_caching_of_table_permitted(X,Y,Z) 1
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+#include "ha_ndbcluster.h"
+#endif
#include <myisampack.h>
#include <errno.h>
@@ -51,7 +54,7 @@ ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count,
ha_read_key_count, ha_read_next_count, ha_read_prev_count,
ha_read_first_count, ha_read_last_count,
ha_commit_count, ha_rollback_count,
- ha_read_rnd_count, ha_read_rnd_next_count;
+ ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count;
static SHOW_COMP_OPTION have_yes= SHOW_OPTION_YES;
@@ -79,6 +82,10 @@ struct show_table_type_st sys_table_types[]=
"Supports transactions and page-level locking", DB_TYPE_BERKELEY_DB},
{"BERKELEYDB",&have_berkeley_db,
"Alias for BDB", DB_TYPE_BERKELEY_DB},
+ {"NDBCLUSTER", &have_ndbcluster,
+ "Clustered, fault tolerant memory based tables", DB_TYPE_NDBCLUSTER},
+ {"NDB", &have_ndbcluster,
+ "Alias for NDBCLUSTER", DB_TYPE_NDBCLUSTER},
{"EXAMPLE",&have_example_db,
"Example storage engine", DB_TYPE_EXAMPLE_DB},
{NullS, NULL, NullS, DB_TYPE_UNKNOWN}
@@ -181,6 +188,10 @@ handler *get_new_handler(TABLE *table, enum db_type db_type)
case DB_TYPE_EXAMPLE_DB:
return new ha_example(table);
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ case DB_TYPE_NDBCLUSTER:
+ return new ha_ndbcluster(table);
+#endif
case DB_TYPE_HEAP:
return new ha_heap(table);
default: // should never happen
@@ -225,6 +236,18 @@ int ha_init()
opt_using_transactions=1;
}
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ {
+ if (ndbcluster_init())
+ {
+ have_ndbcluster= SHOW_OPTION_DISABLED;
+ error= 1;
+ }
+ else
+ opt_using_transactions=1;
+ }
+#endif
return error;
}
@@ -252,6 +275,10 @@ int ha_panic(enum ha_panic_function flag)
if (have_innodb == SHOW_OPTION_YES)
error|=innobase_end();
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ error|=ndbcluster_end();
+#endif
return error;
} /* ha_panic */
@@ -261,6 +288,10 @@ void ha_drop_database(char* path)
if (have_innodb == SHOW_OPTION_YES)
innobase_drop_database(path);
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ ndbcluster_drop_database(path);
+#endif
}
void ha_close_connection(THD* thd)
@@ -269,6 +300,10 @@ void ha_close_connection(THD* thd)
if (have_innodb == SHOW_OPTION_YES)
innobase_close_connection(thd);
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ ndbcluster_close_connection(thd);
+#endif
}
/*
@@ -428,6 +463,19 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans)
WRITE_CACHE, (my_off_t) 0, 0, 1);
thd->transaction.trans_log.end_of_file= max_binlog_cache_size;
}
+#ifdef HAVE_NDBCLUSTER_DB
+ if (trans->ndb_tid)
+ {
+ if ((error=ndbcluster_commit(thd,trans->ndb_tid)))
+ {
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), error);
+ error=1;
+ }
+ if (trans == &thd->transaction.all)
+ operation_done= transaction_commited= 1;
+ trans->ndb_tid=0;
+ }
+#endif
#ifdef HAVE_BERKELEY_DB
if (trans->bdb_tid)
{
@@ -481,6 +529,18 @@ int ha_rollback_trans(THD *thd, THD_TRANS *trans)
if (opt_using_transactions)
{
bool operation_done=0;
+#ifdef HAVE_NDBCLUSTER_DB
+ if (trans->ndb_tid)
+ {
+ if ((error=ndbcluster_rollback(thd, trans->ndb_tid)))
+ {
+ my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), error);
+ error=1;
+ }
+ trans->ndb_tid = 0;
+ operation_done=1;
+ }
+#endif
#ifdef HAVE_BERKELEY_DB
if (trans->bdb_tid)
{
@@ -1160,8 +1220,10 @@ bool handler::caching_allowed(THD* thd, char* table_key,
** Some general functions that isn't in the handler class
****************************************************************************/
- /* Initiates table-file and calls apropriate database-creator */
- /* Returns 1 if something got wrong */
+/*
+ Initiates table-file and calls apropriate database-creator
+ Returns 1 if something got wrong
+*/
int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
bool update_create_info)
@@ -1177,7 +1239,7 @@ int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
{
update_create_info_from_table(create_info, &table);
if (table.file->table_flags() & HA_DROP_BEFORE_CREATE)
- table.file->delete_table(name); // Needed for BDB tables
+ table.file->delete_table(name);
}
if (lower_case_table_names == 2 &&
!(table.file->table_flags() & HA_FILE_BASED))
@@ -1299,6 +1361,26 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache,
/*
+ Try to discover one table from handler(s)
+*/
+
+int ha_discover(const char* dbname, const char* name,
+ const void** frmblob, uint* frmlen)
+{
+ int error= 1; // Table does not exist in any handler
+ DBUG_ENTER("ha_discover");
+ DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+#ifdef HAVE_NDBCLUSTER_DB
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ error= ndbcluster_discover(dbname, name, frmblob, frmlen);
+#endif
+ if (!error)
+ statistic_increment(ha_discover_count,&LOCK_status);
+ DBUG_RETURN(error);
+}
+
+
+/*
Read first row between two ranges.
Store ranges for future calls to read_range_next
@@ -1434,3 +1516,5 @@ int handler::compare_key(key_range *range)
}
return key_compare_result_on_equal;
}
+
+
diff --git a/sql/handler.h b/sql/handler.h
index 9d39eff1301..62ff74c436a 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -28,7 +28,8 @@
#define NO_HASH /* Not yet implemented */
#endif
-#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB)
+#if defined(HAVE_BERKELEY_DB) || defined(HAVE_INNOBASE_DB) || \
+ defined(HAVE_NDBCLUSTER_DB)
#define USING_TRANSACTIONS
#endif
@@ -80,7 +81,6 @@
#define HA_FILE_BASED (1 << 26)
-
/* bits in index_flags(index_number) for what you can do with index */
#define HA_WRONG_ASCII_ORDER 1 /* Can't use sorting through key */
#define HA_READ_NEXT 2 /* Read next record with same key */
@@ -141,12 +141,18 @@
#define HA_CACHE_TBL_ASKTRANSACT 1
#define HA_CACHE_TBL_TRANSACT 2
-enum db_type { DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1,
- DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM,
- DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM,
- DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM,
- DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB, DB_TYPE_GEMINI,
- DB_TYPE_EXAMPLE_DB, DB_TYPE_DEFAULT };
+enum db_type
+{
+ DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1,
+ DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM,
+ DB_TYPE_RMS_ISAM, DB_TYPE_HEAP, DB_TYPE_ISAM,
+ DB_TYPE_MRG_ISAM, DB_TYPE_MYISAM, DB_TYPE_MRG_MYISAM,
+ DB_TYPE_BERKELEY_DB, DB_TYPE_INNODB,
+ DB_TYPE_GEMINI, DB_TYPE_NDBCLUSTER,
+ DB_TYPE_EXAMPLE_DB,
+
+ DB_TYPE_DEFAULT // Must be last
+};
struct show_table_type_st {
const char *type;
@@ -176,6 +182,7 @@ typedef struct st_thd_trans {
void *bdb_tid;
void *innobase_tid;
bool innodb_active_trans;
+ void *ndb_tid;
} THD_TRANS;
enum enum_tx_isolation { ISO_READ_UNCOMMITTED, ISO_READ_COMMITTED,
@@ -479,3 +486,5 @@ bool ha_flush_logs(void);
int ha_recovery_logging(THD *thd, bool on);
int ha_change_key_cache(KEY_CACHE *old_key_cache,
KEY_CACHE *new_key_cache);
+int ha_discover(const char* dbname, const char* name,
+ const void** frmblob, uint* frmlen);
diff --git a/sql/item_subselect.cc b/sql/item_subselect.cc
index 196b54141d1..2d10be62d53 100644
--- a/sql/item_subselect.cc
+++ b/sql/item_subselect.cc
@@ -1356,7 +1356,7 @@ void subselect_uniquesubquery_engine::exclude()
table_map subselect_engine::calc_const_tables(TABLE_LIST *table)
{
table_map map= 0;
- for(; table; table= table->next)
+ for (; table; table= table->next)
{
TABLE *tbl= table->table;
if (tbl && tbl->const_table)
diff --git a/sql/item_sum.h b/sql/item_sum.h
index 107c19b7d85..9593c8ddbba 100644
--- a/sql/item_sum.h
+++ b/sql/item_sum.h
@@ -298,6 +298,7 @@ class Item_sum_avg :public Item_sum_num
void update_field();
Item *result_item(Field *field)
{ return new Item_avg_field(this); }
+ void no_rows_in_result() {}
const char *func_name() const { return "avg"; }
Item *copy_or_same(THD* thd);
};
diff --git a/sql/lex.h b/sql/lex.h
index 3274d544744..64cc6b57464 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -48,7 +48,7 @@ SYM_GROUP sym_group_rtree= {"RTree keys", "HAVE_RTREE_KEYS"};
*/
static SYMBOL symbols[] = {
- { "&&", SYM(AND)},
+ { "&&", SYM(AND_SYM)},
{ "<", SYM(LT)},
{ "<=", SYM(LE)},
{ "<>", SYM(NE)},
@@ -67,7 +67,7 @@ static SYMBOL symbols[] = {
{ "ALL", SYM(ALL)},
{ "ALTER", SYM(ALTER)},
{ "ANALYZE", SYM(ANALYZE_SYM)},
- { "AND", SYM(AND)},
+ { "AND", SYM(AND_SYM)},
{ "ANY", SYM(ANY_SYM)},
{ "AS", SYM(AS)},
{ "ASC", SYM(ASC)},
@@ -296,6 +296,8 @@ static SYMBOL symbols[] = {
{ "NAMES", SYM(NAMES_SYM)},
{ "NATIONAL", SYM(NATIONAL_SYM)},
{ "NATURAL", SYM(NATURAL)},
+ { "NDB", SYM(NDBCLUSTER_SYM)},
+ { "NDBCLUSTER", SYM(NDBCLUSTER_SYM)},
{ "NCHAR", SYM(NCHAR_SYM)},
{ "NEW", SYM(NEW_SYM)},
{ "NEXT", SYM(NEXT_SYM)},
@@ -313,7 +315,7 @@ static SYMBOL symbols[] = {
{ "OPTIMIZE", SYM(OPTIMIZE)},
{ "OPTION", SYM(OPTION)},
{ "OPTIONALLY", SYM(OPTIONALLY)},
- { "OR", SYM(OR)},
+ { "OR", SYM(OR_SYM)},
{ "ORDER", SYM(ORDER_SYM)},
{ "OUTER", SYM(OUTER)},
{ "OUTFILE", SYM(OUTFILE)},
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 3b92e0956ba..207f4a51ff4 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -923,15 +923,15 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
(ulong) thread_id, (ulong) exec_time, error_code);
}
- bool same_db = 0;
+ bool different_db= 1;
if (db && last_db)
{
- if (!(same_db = !memcmp(last_db, db, db_len + 1)))
+ if (different_db= memcmp(last_db, db, db_len + 1))
memcpy(last_db, db, db_len + 1);
}
- if (db && db[0] && !same_db)
+ if (db && db[0] && different_db)
fprintf(file, "use %s;\n", db);
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
*end++=';';
@@ -960,8 +960,10 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
position to store is of the END of the current log event.
*/
#if MYSQL_VERSION_ID < 50000
- rli->future_group_master_log_pos= log_pos + get_event_len();
+ rli->future_group_master_log_pos= log_pos + get_event_len() -
+ (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
#else
+ /* In 5.0 we store the end_log_pos in the relay log so no problem */
rli->future_group_master_log_pos= log_pos;
#endif
clear_all_errors(thd, rli);
@@ -1165,6 +1167,11 @@ int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
DBUG_ENTER("Start_log_event::exec_event");
+ /*
+ If the I/O thread has not started, mi->old_format is BINLOG_FORMAT_CURRENT
+ (that's what the MASTER_INFO constructor does), so the test below is not
+ perfect at all.
+ */
switch (rli->mi->old_format) {
case BINLOG_FORMAT_CURRENT:
/*
@@ -1542,14 +1549,21 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db,
thread_id, exec_time);
}
- bool same_db = 0;
+ bool different_db= 1;
if (db && last_db)
{
- if (!(same_db = !memcmp(last_db, db, db_len + 1)))
+ /*
+ If the database is different from the one of the previous statement, we
+ need to print the "use" command, and we update the last_db.
+ But if commented, the "use" is going to be commented so we should not
+ update the last_db.
+ */
+ if ((different_db= memcmp(last_db, db, db_len + 1)) &&
+ !commented)
memcpy(last_db, db, db_len + 1);
}
- if (db && db[0] && !same_db)
+ if (db && db[0] && different_db)
fprintf(file, "%suse %s;\n",
commented ? "# " : "",
db);
@@ -1667,7 +1681,8 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
if (!use_rli_only_for_errors)
{
#if MYSQL_VERSION_ID < 50000
- rli->future_group_master_log_pos= log_pos + get_event_len();
+ rli->future_group_master_log_pos= log_pos + get_event_len() -
+ (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
#else
rli->future_group_master_log_pos= log_pos;
#endif
@@ -3138,9 +3153,11 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
*/
#if MYSQL_VERSION_ID < 40100
- rli->future_master_log_pos= log_pos + get_event_len();
+ rli->future_master_log_pos= log_pos + get_event_len() -
+ (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
#elif MYSQL_VERSION_ID < 50000
- rli->future_group_master_log_pos= log_pos + get_event_len();
+ rli->future_group_master_log_pos= log_pos + get_event_len() -
+ (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
#else
rli->future_group_master_log_pos= log_pos;
#endif
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 00af3c67c7e..4381ea1f93e 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -839,7 +839,7 @@ extern ulong server_id, concurrency;
extern ulong ha_read_count, ha_write_count, ha_delete_count, ha_update_count;
extern ulong ha_read_key_count, ha_read_next_count, ha_read_prev_count;
extern ulong ha_read_first_count, ha_read_last_count;
-extern ulong ha_read_rnd_count, ha_read_rnd_next_count;
+extern ulong ha_read_rnd_count, ha_read_rnd_next_count, ha_discover_count;
extern ulong ha_commit_count, ha_rollback_count,table_cache_size;
extern ulong max_connections,max_connect_errors, connect_timeout;
extern ulong slave_net_timeout;
@@ -893,6 +893,7 @@ extern SHOW_VAR init_vars[],status_vars[], internal_vars[];
extern SHOW_COMP_OPTION have_isam;
extern SHOW_COMP_OPTION have_innodb;
extern SHOW_COMP_OPTION have_berkeley_db;
+extern SHOW_COMP_OPTION have_ndbcluster;
extern struct system_variables global_system_variables;
extern struct system_variables max_system_variables;
extern struct rand_struct sql_rand;
@@ -962,6 +963,10 @@ int format_number(uint inputflag,uint max_length,my_string pos,uint length,
my_string *errpos);
int openfrm(const char *name,const char *alias,uint filestat,uint prgflag,
uint ha_open_flags, TABLE *outparam);
+int readfrm(const char *name, const void** data, uint* length);
+int writefrm(const char* name, const void* data, uint len);
+int create_table_from_handler(const char *db, const char *name,
+ bool create_if_found);
int closefrm(TABLE *table);
db_type get_table_type(const char *name);
int read_string(File file, gptr *to, uint length);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 2468deb2cc7..2878654b759 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -32,6 +32,9 @@
#ifdef HAVE_ISAM
#include "ha_isam.h"
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+#include "ha_ndbcluster.h"
+#endif
#include <nisam.h>
#include <thr_alarm.h>
#include <ft_global.h>
@@ -261,7 +264,7 @@ my_bool opt_local_infile, opt_external_locking, opt_slave_compressed_protocol;
my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
-my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam;
+my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm;
my_bool opt_secure_auth= 0;
@@ -370,7 +373,8 @@ KEY_CACHE *sql_key_cache;
CHARSET_INFO *system_charset_info, *files_charset_info ;
CHARSET_INFO *national_charset_info, *table_alias_charset;
-SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam, have_example_db;
+SHOW_COMP_OPTION have_berkeley_db, have_innodb, have_isam,
+ have_ndbcluster, have_example_db;
SHOW_COMP_OPTION have_raid, have_openssl, have_symlink, have_query_cache;
SHOW_COMP_OPTION have_crypt, have_compress;
@@ -3625,7 +3629,7 @@ enum options_mysqld
OPT_INNODB_FAST_SHUTDOWN,
OPT_INNODB_FILE_PER_TABLE,
OPT_SAFE_SHOW_DB,
- OPT_INNODB, OPT_ISAM, OPT_SKIP_SAFEMALLOC,
+ OPT_INNODB, OPT_ISAM, OPT_NDBCLUSTER, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
@@ -4158,6 +4162,10 @@ Disable with --skip-innodb (will save memory).",
Disable with --skip-isam.",
(gptr*) &opt_isam, (gptr*) &opt_isam, 0, GET_BOOL, NO_ARG, 1, 0, 0,
0, 0, 0},
+ {"ndbcluster", OPT_NDBCLUSTER, "Enable NDB Cluster (if this version of MySQL supports it). \
+Disable with --skip-ndbcluster (will save memory).",
+ (gptr*) &opt_ndbcluster, (gptr*) &opt_ndbcluster, 0, GET_BOOL, NO_ARG, 1, 0, 0,
+ 0, 0, 0},
{"skip-locking", OPT_SKIP_LOCK,
"Deprecated option, use --skip-external-locking instead.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
@@ -4831,6 +4839,7 @@ struct show_var_st status_vars[]= {
{"Handler_rollback", (char*) &ha_rollback_count, SHOW_LONG},
{"Handler_update", (char*) &ha_update_count, SHOW_LONG},
{"Handler_write", (char*) &ha_write_count, SHOW_LONG},
+ {"Handler_discover", (char*) &ha_discover_count, SHOW_LONG},
{"Key_blocks_not_flushed", (char*) &dflt_key_cache_var.global_blocks_changed,
SHOW_KEY_CACHE_LONG},
{"Key_blocks_used", (char*) &dflt_key_cache_var.global_blocks_used,
@@ -5129,6 +5138,11 @@ static void mysql_init_variables(void)
#else
have_example_db= SHOW_OPTION_NO;
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+ have_ndbcluster=SHOW_OPTION_DISABLED;
+#else
+ have_ndbcluster=SHOW_OPTION_NO;
+#endif
#ifdef USE_RAID
have_raid=SHOW_OPTION_YES;
#else
@@ -5599,6 +5613,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
have_isam= SHOW_OPTION_DISABLED;
#endif
break;
+ case OPT_NDBCLUSTER:
+#ifdef HAVE_NDBCLUSTER_DB
+ if (opt_ndbcluster)
+ have_ndbcluster=SHOW_OPTION_YES;
+ else
+ have_ndbcluster=SHOW_OPTION_DISABLED;
+#endif
+ break;
case OPT_INNODB:
#ifdef HAVE_INNOBASE_DB
if (opt_innodb)
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 5ccda5c7052..1339bdc887f 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -59,6 +59,9 @@
#ifdef HAVE_INNOBASE_DB
#include "ha_innodb.h"
#endif
+#ifdef HAVE_NDBCLUSTER_DB
+#include "ha_ndbcluster.h"
+#endif
static HASH system_variable_hash;
const char *bool_type_names[]= { "OFF", "ON", NullS };
@@ -638,6 +641,7 @@ struct show_var_st init_vars[]= {
{"have_crypt", (char*) &have_crypt, SHOW_HAVE},
{"have_innodb", (char*) &have_innodb, SHOW_HAVE},
{"have_isam", (char*) &have_isam, SHOW_HAVE},
+ {"have_ndbcluster", (char*) &have_ndbcluster, SHOW_HAVE},
{"have_openssl", (char*) &have_openssl, SHOW_HAVE},
{"have_query_cache", (char*) &have_query_cache, SHOW_HAVE},
{"have_raid", (char*) &have_raid, SHOW_HAVE},
@@ -1090,9 +1094,9 @@ static void fix_max_relay_log_size(THD *thd, enum_var_type type)
static int check_max_delayed_threads(THD *thd, set_var *var)
{
- int val= var->value->val_int();
+ longlong val= var->value->val_int();
if (var->type != OPT_GLOBAL && val != 0 &&
- val != global_system_variables.max_insert_delayed_threads)
+ val != (longlong) global_system_variables.max_insert_delayed_threads)
{
char buf[64];
my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name, llstr(val, buf));
@@ -1101,6 +1105,7 @@ static int check_max_delayed_threads(THD *thd, set_var *var)
return 0;
}
+
static void fix_max_connections(THD *thd, enum_var_type type)
{
resize_thr_alarm(max_connections +
diff --git a/sql/slave.cc b/sql/slave.cc
index 67b9c7515e1..061c737b538 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -361,6 +361,55 @@ void init_slave_skip_errors(const char* arg)
}
}
+
+void st_relay_log_info::inc_group_relay_log_pos(ulonglong val,
+ ulonglong log_pos,
+ bool skip_lock)
+{
+ if (!skip_lock)
+ pthread_mutex_lock(&data_lock);
+ inc_event_relay_log_pos(val);
+ group_relay_log_pos= event_relay_log_pos;
+ strmake(group_relay_log_name,event_relay_log_name,
+ sizeof(group_relay_log_name)-1);
+
+ notify_group_relay_log_name_update();
+
+ /*
+ If the slave does not support transactions and replicates a transaction,
+ users should not trust group_master_log_pos (which they can display with
+ SHOW SLAVE STATUS or read from relay-log.info), because to compute
+ group_master_log_pos the slave relies on log_pos stored in the master's
+ binlog, but if we are in a master's transaction these positions are always
+ the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
+ not advance as it should on the non-transactional slave (it advances by
+ big leaps, whereas it should advance by small leaps).
+ */
+ if (log_pos) // 3.23 binlogs don't have log_posx
+ {
+#if MYSQL_VERSION_ID < 50000
+ /*
+ If the event was converted from a 3.23 format, get_event_len() has
+ grown by 6 bytes (at least for most events, except LOAD DATA INFILE
+ which is already a big problem for 3.23->4.0 replication); 6 bytes is
+ the difference between the header's size in 4.0 (LOG_EVENT_HEADER_LEN)
+ and the header's size in 3.23 (OLD_HEADER_LEN). Note that using
+ mi->old_format will not help if the I/O thread has not started yet.
+ Yes this is a hack but it's just to make 3.23->4.x replication work;
+ 3.23->5.0 replication is working much better.
+ */
+ group_master_log_pos= log_pos + val -
+ (mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
+#else
+ group_master_log_pos= log_pos+ val;
+#endif /* MYSQL_VERSION_ID < 5000 */
+ }
+ pthread_cond_broadcast(&data_cond);
+ if (!skip_lock)
+ pthread_mutex_unlock(&data_lock);
+}
+
+
void st_relay_log_info::close_temporary_tables()
{
TABLE *table,*next;
@@ -3473,8 +3522,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_RETURN(1);
}
memcpy(tmp_buf,buf,event_len);
- tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
+ /*
+ Create_file constructor wants a 0 as last char of buffer, this 0 will
+ serve as the string-termination char for the file's name (which is at the
+ end of the buffer)
+ We must increment event_len, otherwise the event constructor will not see
+ this end 0, which leads to segfault.
+ */
+ tmp_buf[event_len++]=0;
buf = (const char*)tmp_buf;
+ int4store(buf+EVENT_LEN_OFFSET, event_len);
}
/*
This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
@@ -3520,7 +3577,11 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_ASSERT(tmp_buf);
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
- mi->master_log_pos += event_len;
+ /*
+ We had incremented event_len, but now when it is used to calculate the
+ position in the master's log, we must use the original value.
+ */
+ mi->master_log_pos += --event_len;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
pthread_mutex_unlock(&mi->data_lock);
my_free((char*)tmp_buf, MYF(0));
diff --git a/sql/slave.h b/sql/slave.h
index 549ceb5d42c..c925831d1a5 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -291,34 +291,7 @@ typedef struct st_relay_log_info
event_relay_log_pos+= val;
}
- void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0)
- {
- if (!skip_lock)
- pthread_mutex_lock(&data_lock);
- inc_event_relay_log_pos(val);
- group_relay_log_pos= event_relay_log_pos;
- strmake(group_relay_log_name,event_relay_log_name,
- sizeof(group_relay_log_name)-1);
-
- notify_group_relay_log_name_update();
-
- /*
- If the slave does not support transactions and replicates a transaction,
- users should not trust group_master_log_pos (which they can display with
- SHOW SLAVE STATUS or read from relay-log.info), because to compute
- group_master_log_pos the slave relies on log_pos stored in the master's
- binlog, but if we are in a master's transaction these positions are always
- the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
- not advance as it should on the non-transactional slave (it advances by
- big leaps, whereas it should advance by small leaps).
- */
- if (log_pos) // 3.23 binlogs don't have log_posx
- group_master_log_pos= log_pos+ val;
- pthread_cond_broadcast(&data_cond);
- if (!skip_lock)
- pthread_mutex_unlock(&data_lock);
- }
-
+ void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0);
int wait_for_pos(THD* thd, String* log_name, longlong log_pos,
longlong timeout);
void close_temporary_tables();
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 8c4571dd540..d24b13ff96f 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -1317,18 +1317,34 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db,
{
char path[FN_REFLEN];
int error;
+ uint discover_retry_count= 0;
DBUG_ENTER("open_unireg_entry");
strxmov(path, mysql_data_home, "/", db, "/", name, NullS);
- if (openfrm(path,alias,
+ while (openfrm(path,alias,
(uint) (HA_OPEN_KEYFILE | HA_OPEN_RNDFILE | HA_GET_INDEX |
HA_TRY_READ_ONLY),
READ_KEYINFO | COMPUTE_TYPES | EXTRA_RECORD,
thd->open_options, entry))
{
if (!entry->crashed)
- goto err; // Can't repair the table
+ {
+ /*
+ Frm file could not be found on disk
+ Since it does not exist, no one can be using it
+ LOCK_open has been locked to protect from someone else
+ trying to discover the table at the same time.
+ */
+ if (discover_retry_count++ != 0)
+ goto err;
+ if (create_table_from_handler(db, name, true) != 0)
+ goto err;
+
+ thd->clear_error(); // Clear error message
+ continue;
+ }
+ // Code below is for repairing a crashed file
TABLE_LIST table_list;
bzero((char*) &table_list, sizeof(table_list)); // just for safe
table_list.db=(char*) db;
@@ -1374,6 +1390,7 @@ static int open_unireg_entry(THD *thd, TABLE *entry, const char *db,
if (error)
goto err;
+ break;
}
/*
If we are here, there was no fatal error (but error may be still
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 8ccfe3cddd5..97baf47a8cb 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -703,7 +703,10 @@ public:
THD_TRANS all; // Trans since BEGIN WORK
THD_TRANS stmt; // Trans for current statement
uint bdb_lock_count;
-
+ uint ndb_lock_count;
+#ifdef HAVE_NDBCLUSTER_DB
+ void* ndb;
+#endif
/*
Tables changed in transaction (that must be invalidated in query cache).
List contain only transactional tables, that not invalidated in query
@@ -893,7 +896,8 @@ public:
{
#ifdef USING_TRANSACTIONS
return (transaction.all.bdb_tid != 0 ||
- transaction.all.innodb_active_trans != 0);
+ transaction.all.innodb_active_trans != 0 ||
+ transaction.all.ndb_tid != 0);
#else
return 0;
#endif
diff --git a/sql/sql_db.cc b/sql/sql_db.cc
index 51b875385f1..a6bd1955fa5 100644
--- a/sql/sql_db.cc
+++ b/sql/sql_db.cc
@@ -551,7 +551,12 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *db,
If the directory is a symbolic link, remove the link first, then
remove the directory the symbolic link pointed at
*/
- if (!found_other_files)
+ if (found_other_files)
+ {
+ my_error(ER_DB_DROP_RMDIR, MYF(0), org_path, EEXIST);
+ DBUG_RETURN(-1);
+ }
+ else
{
char tmp_path[FN_REFLEN], *pos;
char *path= tmp_path;
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index e0e8fed29c8..19a6941fe32 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -879,13 +879,15 @@ int yylex(void *arg, void *yythd)
}
yySkip();
return (SET_VAR);
- case MY_LEX_COLON: // optional line terminator
+ case MY_LEX_SEMICOLON: // optional line terminator
if (yyPeek())
{
- if (((THD *)yythd)->client_capabilities & CLIENT_MULTI_STATEMENTS)
+ THD* thd= (THD*)yythd;
+ if ((thd->client_capabilities & CLIENT_MULTI_STATEMENTS) &&
+ (thd->command != COM_PREPARE))
{
lex->found_colon=(char*)lex->ptr;
- ((THD *)yythd)->server_status |= SERVER_MORE_RESULTS_EXISTS;
+ thd->server_status |= SERVER_MORE_RESULTS_EXISTS;
lex->next_state=MY_LEX_END;
return(END_OF_INPUT);
}
@@ -1529,7 +1531,7 @@ bool st_select_lex::setup_ref_array(THD *thd, uint order_group_num)
*/
bool st_select_lex_unit::check_updateable(char *db, char *table)
{
- for(SELECT_LEX *sl= first_select(); sl; sl= sl->next_select())
+ for (SELECT_LEX *sl= first_select(); sl; sl= sl->next_select())
if (sl->check_updateable(db, table))
return 1;
return 0;
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 64fa398e5f4..34d0d2b31f9 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -5260,5 +5260,5 @@ int create_table_precheck(THD *thd, TABLE_LIST *tables,
DBUG_RETURN(1);
DBUG_RETURN((grant_option && want_priv != CREATE_TMP_ACL &&
check_grant(thd, want_priv, create_table, 0, UINT_MAX, 0)) ?
- 1 : 0)
+ 1 : 0);
}
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index e87a37150ea..448dc825a26 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -181,7 +181,7 @@ int mysqld_show_storage_engines(THD *thd)
Protocol *protocol= thd->protocol;
DBUG_ENTER("mysqld_show_storage_engines");
- field_list.push_back(new Item_empty_string("Type",10));
+ field_list.push_back(new Item_empty_string("Engine",10));
field_list.push_back(new Item_empty_string("Support",10));
field_list.push_back(new Item_empty_string("Comment",80));
@@ -471,7 +471,7 @@ int mysqld_extend_show_tables(THD *thd,const char *db,const char *wild)
(void) sprintf(path,"%s/%s",mysql_data_home,db);
(void) unpack_dirname(path,path);
field_list.push_back(item=new Item_empty_string("Name",NAME_LEN));
- field_list.push_back(item=new Item_empty_string("Type",10));
+ field_list.push_back(item=new Item_empty_string("Engine",10));
item->maybe_null=1;
field_list.push_back(item=new Item_empty_string("Row_format",10));
item->maybe_null=1;
diff --git a/sql/sql_string.cc b/sql/sql_string.cc
index a4eae8a6346..8a093738e2b 100644
--- a/sql/sql_string.cc
+++ b/sql/sql_string.cc
@@ -830,7 +830,7 @@ outp:
void String::print(String *str)
{
char *st= (char*)Ptr, *end= st+str_length;
- for(; st < end; st++)
+ for (; st < end; st++)
{
uchar c= *st;
switch (c)
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index d5f77bf545d..c46a9823a52 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -1148,6 +1148,35 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name,
}
}
+ /*
+ Check that table with given name does not already
+ exist in any storage engine. In such a case it should
+ be discovered and the error ER_TABLE_EXISTS_ERROR be returned
+ unless user specified CREATE TABLE IF EXISTS
+ The LOCK_open mutex has been locked to make sure no
+ one else is attempting to discover the table. Since
+ it's not on disk as a frm file, no one could be using it!
+ */
+ if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+ {
+ bool create_if_not_exists =
+ create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
+ if (!create_table_from_handler(db, table_name,
+ create_if_not_exists))
+ {
+ DBUG_PRINT("info", ("Table already existed in handler"));
+
+ if (create_if_not_exists)
+ {
+ create_info->table_existed= 1; // Mark that table existed
+ error= 0;
+ }
+ else
+ my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
+ goto end;
+ }
+ }
+
thd->proc_info="creating table";
create_info->table_existed= 0; // Mark that table is created
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index a25bd3f17bc..247bec84e8e 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -181,7 +181,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token ACTION
%token AGGREGATE_SYM
%token ALL
-%token AND
+%token AND_SYM
%token AS
%token ASC
%token AUTO_INC
@@ -305,6 +305,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token NAMES_SYM
%token NATIONAL_SYM
%token NATURAL
+%token NDBCLUSTER_SYM
%token NEW_SYM
%token NCHAR_SYM
%token NCHAR_STRING
@@ -318,7 +319,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token OPEN_SYM
%token OPTION
%token OPTIONALLY
-%token OR
+%token OR_SYM
%token OR_OR_CONCAT
%token ORDER_SYM
%token OUTER
@@ -576,8 +577,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token BEFORE_SYM
%left SET_VAR
-%left OR_OR_CONCAT OR XOR
-%left AND
+%left OR_OR_CONCAT OR_SYM XOR
+%left AND_SYM
%left BETWEEN_SYM CASE_SYM WHEN_SYM THEN_SYM ELSE
%left EQ EQUAL_SYM GE GT_SYM LE LT NE IS LIKE REGEXP IN_SYM
%left '|'
@@ -730,7 +731,7 @@ END_OF_INPUT
%type <NONE>
'-' '+' '*' '/' '%' '(' ')'
- ',' '!' '{' '}' '&' '|' AND OR OR_OR_CONCAT BETWEEN_SYM CASE_SYM
+ ',' '!' '{' '}' '&' '|' AND_SYM OR_SYM OR_OR_CONCAT BETWEEN_SYM CASE_SYM
THEN_SYM WHEN_SYM DIV_SYM MOD_SYM
%%
@@ -2493,14 +2494,14 @@ expr_expr:
{
$$= new Item_func_not(new Item_in_subselect($1, $4));
}
- | expr BETWEEN_SYM no_and_expr AND expr
+ | expr BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_between($1,$3,$5); }
- | expr NOT BETWEEN_SYM no_and_expr AND expr
+ | expr NOT BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_not(new Item_func_between($1,$4,$6)); }
| expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); }
- | expr OR expr { $$= new Item_cond_or($1,$3); }
+ | expr OR_SYM expr { $$= new Item_cond_or($1,$3); }
| expr XOR expr { $$= new Item_cond_xor($1,$3); }
- | expr AND expr { $$= new Item_cond_and($1,$3); }
+ | expr AND_SYM expr { $$= new Item_cond_and($1,$3); }
| expr SOUNDS_SYM LIKE expr
{
$$= new Item_func_eq(new Item_func_soundex($1),
@@ -2541,14 +2542,14 @@ expr_expr:
/* expressions that begin with 'expr' that do NOT follow IN_SYM */
no_in_expr:
- no_in_expr BETWEEN_SYM no_and_expr AND expr
+ no_in_expr BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_between($1,$3,$5); }
- | no_in_expr NOT BETWEEN_SYM no_and_expr AND expr
+ | no_in_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_not(new Item_func_between($1,$4,$6)); }
| no_in_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); }
- | no_in_expr OR expr { $$= new Item_cond_or($1,$3); }
+ | no_in_expr OR_SYM expr { $$= new Item_cond_or($1,$3); }
| no_in_expr XOR expr { $$= new Item_cond_xor($1,$3); }
- | no_in_expr AND expr { $$= new Item_cond_and($1,$3); }
+ | no_in_expr AND_SYM expr { $$= new Item_cond_and($1,$3); }
| no_in_expr SOUNDS_SYM LIKE expr
{
$$= new Item_func_eq(new Item_func_soundex($1),
@@ -2599,12 +2600,12 @@ no_and_expr:
{
$$= new Item_func_not(new Item_in_subselect($1, $4));
}
- | no_and_expr BETWEEN_SYM no_and_expr AND expr
+ | no_and_expr BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_between($1,$3,$5); }
- | no_and_expr NOT BETWEEN_SYM no_and_expr AND expr
+ | no_and_expr NOT BETWEEN_SYM no_and_expr AND_SYM expr
{ $$= new Item_func_not(new Item_func_between($1,$4,$6)); }
| no_and_expr OR_OR_CONCAT expr { $$= or_or_concat(YYTHD, $1,$3); }
- | no_and_expr OR expr { $$= new Item_cond_or($1,$3); }
+ | no_and_expr OR_SYM expr { $$= new Item_cond_or($1,$3); }
| no_and_expr XOR expr { $$= new Item_cond_xor($1,$3); }
| no_and_expr SOUNDS_SYM LIKE expr
{
@@ -4219,8 +4220,8 @@ show_param:
YYABORT;
}
| NEW_SYM MASTER_SYM FOR_SYM SLAVE WITH MASTER_LOG_FILE_SYM EQ
- TEXT_STRING_sys AND MASTER_LOG_POS_SYM EQ ulonglong_num
- AND MASTER_SERVER_ID_SYM EQ
+ TEXT_STRING_sys AND_SYM MASTER_LOG_POS_SYM EQ ulonglong_num
+ AND_SYM MASTER_SERVER_ID_SYM EQ
ULONG_NUM
{
Lex->sql_command = SQLCOM_SHOW_NEW_MASTER;
@@ -5056,6 +5057,7 @@ keyword:
| NAMES_SYM {}
| NATIONAL_SYM {}
| NCHAR_SYM {}
+ | NDBCLUSTER_SYM {}
| NEXT_SYM {}
| NEW_SYM {}
| NO_SYM {}
@@ -5508,7 +5510,7 @@ grant_privilege:
opt_and:
/* empty */ {}
- | AND {}
+ | AND_SYM {}
;
require_list:
diff --git a/sql/table.cc b/sql/table.cc
index 1b7d30560ef..281a8c10409 100644
--- a/sql/table.cc
+++ b/sql/table.cc
@@ -944,7 +944,8 @@ static void frm_error(int error, TABLE *form, const char *name, myf errortype)
break;
case 2:
{
- datext=form->file ? *form->file->bas_ext() : "";
+ datext= form->file ? *form->file->bas_ext() : "";
+ datext= datext==NullS ? "" : datext;
err_no= (my_errno == ENOENT) ? ER_FILE_NOT_FOUND : (my_errno == EAGAIN) ?
ER_FILE_USED : ER_CANT_OPEN_FILE;
my_error(err_no,errortype,
diff --git a/sql/unireg.h b/sql/unireg.h
index 442809a9e08..4ab2ba26b15 100644
--- a/sql/unireg.h
+++ b/sql/unireg.h
@@ -48,7 +48,7 @@
#define MAX_ALIAS_NAME 256
#define MAX_FIELD_NAME 34 /* Max colum name length +2 */
#define MAX_SYS_VAR_LENGTH 32
-#define MAX_KEY 32 /* Max used keys */
+#define MAX_KEY 64 /* Max used keys */
#define MAX_REF_PARTS 16 /* Max parts used as ref */
#define MAX_KEY_LENGTH 1024 /* max possible key */
#if SIZEOF_OFF_T > 4