summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc1758
1 files changed, 1248 insertions, 510 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 1837500d8f7..d59ab919862 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -20,7 +20,6 @@
NDB Cluster
*/
-
#ifdef __GNUC__
#pragma implementation // gcc: Class implementation
#endif
@@ -33,9 +32,6 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/NdbScanFilter.hpp>
-#define USE_DISCOVER_ON_STARTUP
-//#define USE_NDB_POOL
-
// Default value for parallelism
static const int parallelism= 240;
@@ -49,11 +45,13 @@ static const ha_rows autoincrement_prefetch= 32;
// connectstring to cluster if given by mysqld
const char *ndbcluster_connectstring= 0;
+static const char *ha_ndb_ext=".ndb";
+
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
#define ERR_PRINT(err) \
- DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
+ DBUG_PRINT("error", ("%d message: %s", err.code, err.message))
#define ERR_RETURN(err) \
{ \
@@ -63,13 +61,14 @@ const char *ndbcluster_connectstring= 0;
// Typedefs for long names
typedef NdbDictionary::Column NDBCOL;
-typedef NdbDictionary::Table NDBTAB;
+typedef NdbDictionary::Table NDBTAB;
typedef NdbDictionary::Index NDBINDEX;
typedef NdbDictionary::Dictionary NDBDICT;
-bool ndbcluster_inited= false;
+bool ndbcluster_inited= FALSE;
static Ndb* g_ndb= NULL;
+static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;
@@ -86,6 +85,16 @@ static int packfrm(const void *data, uint len, const void **pack_data, uint *pac
static int unpackfrm(const void **data, uint *len,
const void* pack_data);
+static int ndb_get_table_statistics(Ndb*, const char *,
+ Uint64* rows, Uint64* commits);
+
+
+/*
+ Dummy buffer to read zero pack_length fields
+ which are mapped to 1 char
+*/
+static byte dummy_buf[1];
+
/*
Error handling functions
*/
@@ -103,7 +112,9 @@ static const err_code_mapping err_map[]=
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
- { 241, HA_ERR_OLD_METADATA },
+
+ { 709, HA_ERR_NO_SUCH_TABLE },
+ { 284, HA_ERR_NO_SUCH_TABLE },
{ 266, HA_ERR_LOCK_WAIT_TIMEOUT },
{ 274, HA_ERR_LOCK_WAIT_TIMEOUT },
@@ -118,6 +129,8 @@ static const err_code_mapping err_map[]=
{ 827, HA_ERR_RECORD_FILE_FULL },
{ 832, HA_ERR_RECORD_FILE_FULL },
+ { 0, 1 },
+
{ -1, -1 }
};
@@ -127,13 +140,158 @@ static int ndb_to_mysql_error(const NdbError *err)
uint i;
for (i=0 ; err_map[i].ndb_err != err->code ; i++)
{
- if (err_map[i].my_err == -1)
+ if (err_map[i].my_err == -1){
+ // Push the NDB error message as warning
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ err->code, err->message, "NDB");
return err->code;
+ }
}
return err_map[i].my_err;
}
+
+inline
+int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans)
+{
+ int m_batch_execute= 0;
+#ifdef NOT_USED
+ if (m_batch_execute)
+ return 0;
+#endif
+ return trans->execute(NoCommit,AbortOnError,1);
+}
+
+inline
+int execute_commit(ha_ndbcluster *h, NdbConnection *trans)
+{
+ int m_batch_execute= 0;
+#ifdef NOT_USED
+ if (m_batch_execute)
+ return 0;
+#endif
+ return trans->execute(Commit,AbortOnError,1);
+}
+
+inline
+int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans)
+{
+ int m_batch_execute= 0;
+#ifdef NOT_USED
+ if (m_batch_execute)
+ return 0;
+#endif
+ return trans->execute(NoCommit,IgnoreError,1);
+}
+
+/*
+ Place holder for ha_ndbcluster thread specific data
+*/
+
+Thd_ndb::Thd_ndb()
+{
+ ndb= new Ndb(g_ndb_cluster_connection, "");
+ lock_count= 0;
+ count= 0;
+ error= 0;
+}
+
+Thd_ndb::~Thd_ndb()
+{
+ if (ndb)
+ delete ndb;
+}
+
+/*
+ * manage uncommitted insert/deletes during transactio to get records correct
+ */
+
+struct Ndb_table_local_info {
+ int no_uncommitted_rows_count;
+ ulong last_count;
+ ha_rows records;
+};
+
+void ha_ndbcluster::set_rec_per_key()
+{
+ DBUG_ENTER("ha_ndbcluster::get_status_const");
+ for (uint i=0 ; i < table->keys ; i++)
+ {
+ table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
+ }
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::records_update()
+{
+ DBUG_ENTER("ha_ndbcluster::records_update");
+ struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
+ DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
+ ((const NDBTAB *)m_table)->getTableId(),
+ info->no_uncommitted_rows_count));
+ // if (info->records == ~(ha_rows)0)
+ {
+ Uint64 rows;
+ if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
+ info->records= rows;
+ }
+ }
+ {
+ THD *thd= current_thd;
+ if (((Thd_ndb*)(thd->transaction.thd_ndb))->error)
+ info->no_uncommitted_rows_count= 0;
+ }
+ records= info->records+ info->no_uncommitted_rows_count;
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_execute_failure()
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
+ THD *thd= current_thd;
+ ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 1;
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
+ struct Ndb_table_local_info *info= (struct Ndb_table_local_info *)m_table_info;
+ Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
+ if (info->last_count != thd_ndb->count)
+ {
+ info->last_count = thd_ndb->count;
+ info->no_uncommitted_rows_count= 0;
+ info->records= ~(ha_rows)0;
+ DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
+ ((const NDBTAB *)m_table)->getTableId(),
+ info->no_uncommitted_rows_count));
+ }
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_update(int c)
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
+ struct Ndb_table_local_info *info=
+ (struct Ndb_table_local_info *)m_table_info;
+ info->no_uncommitted_rows_count+= c;
+ DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
+ ((const NDBTAB *)m_table)->getTableId(),
+ info->no_uncommitted_rows_count));
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
+{
+ DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
+ ((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
+ ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 0;
+ DBUG_VOID_RETURN;
+}
+
/*
Take care of the error that occured in NDB
@@ -142,12 +300,11 @@ static int ndb_to_mysql_error(const NdbError *err)
# The mapped error code
*/
+
int ha_ndbcluster::ndb_err(NdbConnection *trans)
{
int res;
const NdbError err= trans->getNdbError();
- if (!err.code)
- return 0; // Don't log things to DBUG log if no error
DBUG_ENTER("ndb_err");
ERR_PRINT(err);
@@ -157,6 +314,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
NDBDICT *dict= m_ndb->getDictionary();
DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
dict->invalidateTable(m_tabname);
+ table->version=0L; /* Free when thread is ready */
break;
}
default:
@@ -183,10 +341,11 @@ bool ha_ndbcluster::get_error_message(int error,
DBUG_ENTER("ha_ndbcluster::get_error_message");
DBUG_PRINT("enter", ("error: %d", error));
- if (!m_ndb)
- DBUG_RETURN(false);
+ Ndb *ndb= ((Thd_ndb*)current_thd->transaction.thd_ndb)->ndb;
+ if (!ndb)
+ DBUG_RETURN(FALSE);
- const NdbError err= m_ndb->getNdbError(error);
+ const NdbError err= ndb->getNdbError(error);
bool temporary= err.status==NdbError::TemporaryError;
buf->set(err.message, strlen(err.message), &my_charset_bin);
DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary));
@@ -196,7 +355,8 @@ bool ha_ndbcluster::get_error_message(int error,
/*
Check if type is supported by NDB.
- TODO Use this once, not in every operation
+ TODO Use this once in open(), not in every operation
+
*/
static inline bool ndb_supported_type(enum_field_types type)
@@ -224,12 +384,12 @@ static inline bool ndb_supported_type(enum_field_types type)
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
- return true;
+ return TRUE;
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
break;
}
- return false;
+ return FALSE;
}
@@ -277,7 +437,7 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
*/
int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr)
+ uint fieldnr, bool *set_blob_value)
{
const byte* field_ptr= field->ptr;
uint32 pack_len= field->pack_length();
@@ -289,6 +449,13 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
if (ndb_supported_type(field->type()))
{
+ // ndb currently does not support size 0
+ const byte *empty_field= "";
+ if (pack_len == 0)
+ {
+ pack_len= 1;
+ field_ptr= empty_field;
+ }
if (! (field->flags & BLOB_FLAG))
{
if (field->is_null())
@@ -312,14 +479,18 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
char* blob_ptr= NULL;
field_blob->get_ptr(&blob_ptr);
- // Looks like NULL blob can also be signaled in this way
- if (blob_ptr == NULL)
- DBUG_RETURN(ndb_blob->setNull() != 0);
+ // Looks like NULL ptr signals length 0 blob
+ if (blob_ptr == NULL) {
+ DBUG_ASSERT(blob_len == 0);
+ blob_ptr= (char*)"";
+ }
DBUG_PRINT("value", ("set blob ptr=%x len=%u",
(unsigned)blob_ptr, blob_len));
DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
+ if (set_blob_value)
+ *set_blob_value= TRUE;
// No callback needed to write value
DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
}
@@ -414,7 +585,7 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
*/
int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr)
+ uint fieldnr, byte* buf)
{
DBUG_ENTER("get_ndb_value");
DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
@@ -422,12 +593,19 @@ int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
if (field != NULL)
{
+ DBUG_ASSERT(buf);
if (ndb_supported_type(field->type()))
{
DBUG_ASSERT(field->ptr != NULL);
if (! (field->flags & BLOB_FLAG))
- {
- m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr);
+ {
+ byte *field_buf;
+ if (field->pack_length() != 0)
+ field_buf= buf + (field->ptr - table->record[0]);
+ else
+ field_buf= dummy_buf;
+ m_value[fieldnr].rec= ndb_op->getValue(fieldnr,
+ field_buf);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
@@ -459,24 +637,24 @@ int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
bool ha_ndbcluster::uses_blob_value(bool all_fields)
{
if (table->blob_fields == 0)
- return false;
+ return FALSE;
if (all_fields)
- return true;
+ return TRUE;
{
uint no_fields= table->fields;
int i;
- THD *thd= current_thd;
+ THD *thd= table->in_use;
// They always put blobs at the end..
for (i= no_fields - 1; i >= 0; i--)
{
Field *field= table->field[i];
if (thd->query_id == field->query_id)
{
- return true;
+ return TRUE;
}
}
}
- return false;
+ return FALSE;
}
@@ -494,75 +672,77 @@ int ha_ndbcluster::get_metadata(const char *path)
{
NDBDICT *dict= m_ndb->getDictionary();
const NDBTAB *tab;
- const void *data, *pack_data;
- const char **key_name;
- uint ndb_columns, mysql_columns, length, pack_length;
int error;
+ bool invalidating_ndb_table= FALSE;
+
DBUG_ENTER("get_metadata");
DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
- if (!(tab= dict->getTable(m_tabname)))
- ERR_RETURN(dict->getNdbError());
- DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
-
- /*
- This is the place to check that the table we got from NDB
- is equal to the one on local disk
- */
- ndb_columns= (uint) tab->getNoOfColumns();
- mysql_columns= table->fields;
- if (table->primary_key == MAX_KEY)
- ndb_columns--;
- if (ndb_columns != mysql_columns)
- {
- DBUG_PRINT("error",
- ("Wrong number of columns, ndb: %d mysql: %d",
- ndb_columns, mysql_columns));
- DBUG_RETURN(HA_ERR_OLD_METADATA);
- }
-
- /*
- Compare FrmData in NDB with frm file from disk.
- */
- error= 0;
- if (readfrm(path, &data, &length) ||
- packfrm(data, length, &pack_data, &pack_length))
- {
- my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
- my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
- DBUG_RETURN(1);
- }
+ do {
+ const void *data, *pack_data;
+ uint length, pack_length;
+
+ if (!(tab= dict->getTable(m_tabname)))
+ ERR_RETURN(dict->getNdbError());
+ DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
+ /*
+ Compare FrmData in NDB with frm file from disk.
+ */
+ error= 0;
+ if (readfrm(path, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_RETURN(1);
+ }
- if ((pack_length != tab->getFrmLength()) ||
- (memcmp(pack_data, tab->getFrmData(), pack_length)))
- {
- DBUG_PRINT("error",
- ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
- pack_length, tab->getFrmLength(),
- memcmp(pack_data, tab->getFrmData(), pack_length)));
- DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
- DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
- error= HA_ERR_OLD_METADATA;
- }
- my_free((char*)data, MYF(0));
- my_free((char*)pack_data, MYF(0));
+ if ((pack_length != tab->getFrmLength()) ||
+ (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ {
+ if (!invalidating_ndb_table)
+ {
+ DBUG_PRINT("info", ("Invalidating table"));
+ dict->invalidateTable(m_tabname);
+ invalidating_ndb_table= TRUE;
+ }
+ else
+ {
+ DBUG_PRINT("error",
+ ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+ pack_length, tab->getFrmLength(),
+ memcmp(pack_data, tab->getFrmData(), pack_length)));
+ DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
+ DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
+ error= 3;
+ invalidating_ndb_table= FALSE;
+ }
+ }
+ else
+ {
+ invalidating_ndb_table= FALSE;
+ }
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+ } while (invalidating_ndb_table);
+
if (error)
DBUG_RETURN(error);
- // All checks OK, lets use the table
- m_table= (void*)tab;
-
+ m_table= NULL;
+ m_table_info= NULL;
+
DBUG_RETURN(build_index_list(table, ILBP_OPEN));
}
int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
{
+ uint i;
int error= 0;
- char *name;
- const char *index_name;
+ const char *name, *index_name;
+ char unique_index_name[FN_LEN];
static const char* unique_suffix= "$unique";
- uint i, name_len;
KEY* key_info= tab->key_info;
const char **key_name= tab->keynames.type_names;
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
@@ -576,21 +756,15 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
m_index[i].type= idx_type;
if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{
- name_len= strlen(index_name)+strlen(unique_suffix)+1;
- // Create name for unique index by appending "$unique";
- if (!(name= my_malloc(name_len, MYF(MY_WME))))
- DBUG_RETURN(2);
- strxnmov(name, name_len, index_name, unique_suffix, NullS);
- m_index[i].unique_name= name;
- DBUG_PRINT("info", ("Created unique index name: %s for index %d",
- name, i));
+ strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
+ DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d",
+ unique_index_name, i));
}
// Create secondary indexes if in create phase
if (phase == ILBP_CREATE)
{
- DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
-
- switch (m_index[i].type){
+ DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
+ switch (idx_type){
case PRIMARY_KEY_INDEX:
// Do nothing, already created
@@ -600,16 +774,16 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
break;
case UNIQUE_ORDERED_INDEX:
if (!(error= create_ordered_index(index_name, key_info)))
- error= create_unique_index(get_unique_index_name(i), key_info);
+ error= create_unique_index(unique_index_name, key_info);
break;
case UNIQUE_INDEX:
- error= create_unique_index(get_unique_index_name(i), key_info);
+ error= create_unique_index(unique_index_name, key_info);
break;
case ORDERED_INDEX:
error= create_ordered_index(index_name, key_info);
break;
default:
- DBUG_ASSERT(false);
+ DBUG_ASSERT(FALSE);
break;
}
if (error)
@@ -620,21 +794,20 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
}
}
// Add handles to index objects
- DBUG_PRINT("info", ("Trying to add handle to index %s", index_name));
- if ((m_index[i].type != PRIMARY_KEY_INDEX) &&
- (m_index[i].type != UNIQUE_INDEX))
+ if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
{
+ DBUG_PRINT("info", ("Get handle to index %s", index_name));
const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].index= (void *) index;
}
- if (m_index[i].unique_name)
+ if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
{
- const NDBINDEX *index= dict->getIndex(m_index[i].unique_name, m_tabname);
+ DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
+ const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
if (!index) DBUG_RETURN(1);
m_index[i].unique_index= (void *) index;
}
- DBUG_PRINT("info", ("Added handle to index %s", index_name));
}
DBUG_RETURN(error);
@@ -665,13 +838,11 @@ void ha_ndbcluster::release_metadata()
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
m_table= NULL;
+ m_table_info= NULL;
// Release index list
for (i= 0; i < MAX_KEY; i++)
{
- if (m_index[i].unique_name)
- my_free((char*)m_index[i].unique_name, MYF(0));
- m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
}
@@ -681,17 +852,12 @@ void ha_ndbcluster::release_metadata()
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
- int lm;
- if (type == TL_WRITE_ALLOW_WRITE)
- lm= NdbScanOperation::LM_Exclusive;
+ if (type >= TL_WRITE_ALLOW_WRITE)
+ return NdbOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
- /*
- TODO use a new scan mode to read + lock + keyinfo
- */
- lm= NdbScanOperation::LM_Exclusive;
+ return NdbOperation::LM_Read;
else
- lm= NdbScanOperation::LM_CommittedRead;
- return lm;
+ return NdbOperation::LM_CommittedRead;
}
static const ulong index_type_flags[]=
@@ -709,33 +875,26 @@ static const ulong index_type_flags[]=
through the index.
*/
// HA_KEYREAD_ONLY |
- HA_READ_NEXT |
- HA_READ_RANGE,
+ HA_READ_NEXT |
+ HA_READ_RANGE |
+ HA_READ_ORDER,
/* UNIQUE_INDEX */
HA_ONLY_WHOLE_INDEX,
/* UNIQUE_ORDERED_INDEX */
- HA_READ_NEXT |
- HA_READ_RANGE,
+ HA_READ_NEXT |
+ HA_READ_RANGE |
+ HA_READ_ORDER,
/* ORDERED_INDEX */
- HA_READ_NEXT |
- HA_READ_RANGE,
+ HA_READ_NEXT |
+ HA_READ_RANGE |
+ HA_READ_ORDER
};
static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
-inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
-{
- return table->keynames.type_names[idx_no];
-}
-
-inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
-{
- return m_index[idx_no].unique_name;
-}
-
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
@@ -829,8 +988,10 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
DBUG_PRINT("enter", ("key_len: %u", key_len));
DBUG_DUMP("key", (char*)key, key_len);
- if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
- op->readTuple() != 0)
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
+ op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
if (table->primary_key == MAX_KEY)
@@ -842,7 +1003,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
ERR_RETURN(trans->getNdbError());
// Read key at the same time, for future reference
- if (get_ndb_value(op, NULL, no_fields))
+ if (get_ndb_value(op, NULL, no_fields, NULL))
ERR_RETURN(trans->getNdbError());
}
else
@@ -859,7 +1020,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if ((thd->query_id == field->query_id) ||
retrieve_all_fields)
{
- if (get_ndb_value(op, field, i))
+ if (get_ndb_value(op, field, i, buf))
ERR_RETURN(trans->getNdbError());
}
else
@@ -869,7 +1030,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
}
}
- if (trans->execute(NoCommit, IgnoreError) != 0)
+ if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -898,8 +1059,10 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0);
- if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
- op->readTuple() != 0)
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
+ op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
int res;
@@ -913,12 +1076,12 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
if (!(field->flags & PRI_KEY_FLAG) &&
(thd->query_id != field->query_id))
{
- if (get_ndb_value(op, field, i))
+ if (get_ndb_value(op, field, i, new_data))
ERR_RETURN(trans->getNdbError());
}
}
- if (trans->execute(NoCommit) != 0)
+ if (execute_no_commit(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -948,12 +1111,13 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_ENTER("unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len);
- DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
m_index[active_index].unique_index,
- (NDBTAB *) m_table)) ||
- op->readTuple() != 0)
+ (const NDBTAB *) m_table)) ||
+ op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
// Set secondary index key(s)
@@ -976,7 +1140,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
- if (get_ndb_value(op, field, i))
+ if (get_ndb_value(op, field, i, buf))
ERR_RETURN(op->getNdbError());
}
else
@@ -986,7 +1150,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
}
}
- if (trans->execute(NoCommit, IgnoreError) != 0)
+ if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
@@ -1022,7 +1186,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
If this an update or delete, call nextResult with false
to process any records already cached in NdbApi
*/
- bool contact_ndb= m_lock.type != TL_WRITE_ALLOW_WRITE;
+ bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
@@ -1030,10 +1194,10 @@ inline int ha_ndbcluster::next_result(byte *buf)
*/
if (ops_pending && blobs_pending)
{
- if (trans->execute(NoCommit) != 0)
- DBUG_RETURN(ndb_err(trans));
+ if (execute_no_commit(this,trans) != 0)
+ DBUG_RETURN(ndb_err(trans));
ops_pending= 0;
- blobs_pending= false;
+ blobs_pending= FALSE;
}
check= cursor->nextResult(contact_ndb);
if (check == 0)
@@ -1056,9 +1220,22 @@ inline int ha_ndbcluster::next_result(byte *buf)
be sent to NDB
*/
DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
- if (ops_pending && (trans->execute(NoCommit) != 0))
- DBUG_RETURN(ndb_err(trans));
- ops_pending= 0;
+ if (ops_pending)
+ {
+ if (current_thd->transaction.on)
+ {
+ if (execute_no_commit(this,trans) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ }
+ else
+ {
+ if (execute_commit(this,trans) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ int res= trans->restart();
+ DBUG_ASSERT(res == 0);
+ }
+ ops_pending= 0;
+ }
contact_ndb= (check == 2);
}
@@ -1073,76 +1250,204 @@ inline int ha_ndbcluster::next_result(byte *buf)
DBUG_RETURN(HA_ERR_END_OF_FILE);
}
-
/*
- Set bounds for a ordered index scan, use key_range
+ Set bounds for ordered index scan.
*/
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
- const key_range *key,
- int bound)
+ const key_range *keys[2])
{
- uint key_len, key_store_len, tot_len, key_tot_len;
- byte *key_ptr;
- KEY* key_info= table->key_info + active_index;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- Field* field;
- bool key_nullable, key_null;
+ const KEY *const key_info= table->key_info + active_index;
+ const uint key_parts= key_info->key_parts;
+ uint key_tot_len[2];
+ uint tot_len;
+ uint i, j;
DBUG_ENTER("set_bounds");
- DBUG_PRINT("enter", ("bound: %d", bound));
- DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts));
- DBUG_PRINT("enter", ("key->length: %d", key->length));
- DBUG_PRINT("enter", ("key->flag: %d", key->flag));
+ DBUG_PRINT("info", ("key_parts=%d", key_parts));
- // Set bounds using key data
+ for (j= 0; j <= 1; j++)
+ {
+ const key_range *key= keys[j];
+ if (key != NULL)
+ {
+ // for key->flag see ha_rkey_function
+ DBUG_PRINT("info", ("key %d length=%d flag=%d",
+ j, key->length, key->flag));
+ key_tot_len[j]= key->length;
+ }
+ else
+ {
+ DBUG_PRINT("info", ("key %d not present", j));
+ key_tot_len[j]= 0;
+ }
+ }
tot_len= 0;
- key_ptr= (byte *) key->key;
- key_tot_len= key->length;
- for (; key_part != end; key_part++)
- {
- field= key_part->field;
- key_len= key_part->length;
- key_store_len= key_part->store_length;
- key_nullable= (bool) key_part->null_bit;
- key_null= (field->maybe_null() && *key_ptr);
- tot_len+= key_store_len;
-
- const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"};
- DBUG_ASSERT(bound >= 0 && bound <= 4);
- DBUG_PRINT("info", ("Set Bound%s on %s %s %s %s",
- bounds[bound],
- field->field_name,
- key_nullable ? "NULLABLE" : "",
- key_null ? "NULL":""));
- DBUG_PRINT("info", ("Total length %ds", tot_len));
-
- DBUG_DUMP("key", (char*) key_ptr, key_store_len);
-
- if (op->setBound(field->field_name,
- bound,
- key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr),
- key_null ? 0 : key_len) != 0)
- ERR_RETURN(op->getNdbError());
-
- key_ptr+= key_store_len;
- if (tot_len >= key_tot_len)
- break;
+ for (i= 0; i < key_parts; i++)
+ {
+ KEY_PART_INFO *key_part= &key_info->key_part[i];
+ Field *field= key_part->field;
+ uint part_len= key_part->length;
+ uint part_store_len= key_part->store_length;
+ bool part_nullable= (bool) key_part->null_bit;
+ // Info about each key part
+ struct part_st {
+ bool part_last;
+ const key_range *key;
+ const byte *part_ptr;
+ bool part_null;
+ int bound_type;
+ const char* bound_ptr;
+ };
+ struct part_st part[2];
+
+ for (j= 0; j <= 1; j++)
+ {
+ struct part_st &p = part[j];
+ p.key= NULL;
+ p.bound_type= -1;
+ if (tot_len < key_tot_len[j])
+ {
+ p.part_last= (tot_len + part_store_len >= key_tot_len[j]);
+ p.key= keys[j];
+ p.part_ptr= &p.key->key[tot_len];
+ p.part_null= (field->maybe_null() && *p.part_ptr);
+ p.bound_ptr= (const char *)
+ p.part_null ? 0 : part_nullable ? p.part_ptr + 1 : p.part_ptr;
+
+ if (j == 0)
+ {
+ switch (p.key->flag)
+ {
+ case HA_READ_KEY_EXACT:
+ p.bound_type= NdbIndexScanOperation::BoundEQ;
+ break;
+ case HA_READ_KEY_OR_NEXT:
+ p.bound_type= NdbIndexScanOperation::BoundLE;
+ break;
+ case HA_READ_AFTER_KEY:
+ if (! p.part_last)
+ p.bound_type= NdbIndexScanOperation::BoundLE;
+ else
+ p.bound_type= NdbIndexScanOperation::BoundLT;
+ break;
+ default:
+ break;
+ }
+ }
+ if (j == 1) {
+ switch (p.key->flag)
+ {
+ case HA_READ_BEFORE_KEY:
+ if (! p.part_last)
+ p.bound_type= NdbIndexScanOperation::BoundGE;
+ else
+ p.bound_type= NdbIndexScanOperation::BoundGT;
+ break;
+ case HA_READ_AFTER_KEY: // weird
+ p.bound_type= NdbIndexScanOperation::BoundGE;
+ break;
+ default:
+ break;
+ }
+ }
- /*
- Only one bound which is not EQ can be set
- so if this bound was not EQ, bail out and make
- a best effort attempt
- */
- if (bound != NdbIndexScanOperation::BoundEQ)
- break;
- }
+ if (p.bound_type == -1)
+ {
+ DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
+ DBUG_ASSERT(false);
+ // Stop setting bounds but continue with what we have
+ DBUG_RETURN(0);
+ }
+ }
+ }
+
+ // Seen with e.g. b = 1 and c > 1
+ if (part[0].bound_type == NdbIndexScanOperation::BoundLE &&
+ part[1].bound_type == NdbIndexScanOperation::BoundGE &&
+ memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
+ {
+ DBUG_PRINT("info", ("replace LE/GE pair by EQ"));
+ part[0].bound_type= NdbIndexScanOperation::BoundEQ;
+ part[1].bound_type= -1;
+ }
+ // Not seen but was in previous version
+ if (part[0].bound_type == NdbIndexScanOperation::BoundEQ &&
+ part[1].bound_type == NdbIndexScanOperation::BoundGE &&
+ memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
+ {
+ DBUG_PRINT("info", ("remove GE from EQ/GE pair"));
+ part[1].bound_type= -1;
+ }
+ for (j= 0; j <= 1; j++)
+ {
+ struct part_st &p = part[j];
+ // Set bound if not done with this key
+ if (p.key != NULL)
+ {
+ DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
+ j, i, tot_len, part_len, p.part_last, p.bound_type));
+ DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);
+
+ // Set bound if not cancelled via type -1
+ if (p.bound_type != -1)
+ if (op->setBound(field->field_name, p.bound_type, p.bound_ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ }
+
+ tot_len+= part_store_len;
+ }
DBUG_RETURN(0);
}
+inline
+int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
+{
+ uint i;
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+
+ DBUG_ENTER("define_read_attrs");
+
+ // Define attributes to read
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG) ||
+ retrieve_all_fields)
+ {
+ if (get_ndb_value(op, field, i, buf))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i].ptr= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= table->fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (const NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, NULL, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (execute_no_commit(this,trans) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
/*
Start ordered index scan in NDB
@@ -1152,52 +1457,60 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf)
{
+ bool restart;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbIndexScanOperation *op;
- const char *index_name;
DBUG_ENTER("ordered_index_scan");
DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
-
- index_name= get_index_name(active_index);
- if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
- m_index[active_index].index,
- (NDBTAB *) m_table)))
- ERR_RETURN(trans->getNdbError());
- NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
- get_ndb_lock_type(m_lock.type);
- if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
- ERR_RETURN(trans->getNdbError());
- m_active_cursor= cursor;
+ // Check that sorted seems to be initialised
+ DBUG_ASSERT(sorted == 0 || sorted == 1);
+
+ if (m_active_cursor == 0)
+ {
+ restart= false;
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
+ m_index[active_index].index,
+ (const NDBTAB *) m_table)) ||
+ !(cursor= op->readTuples(lm, 0, parallelism, sorted)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+ } else {
+ restart= true;
+ op= (NdbIndexScanOperation*)m_active_cursor->getOperation();
+
+ DBUG_ASSERT(op->getSorted() == sorted);
+ DBUG_ASSERT(op->getLockMode() ==
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
+ if(op->reset_bounds())
+ DBUG_RETURN(ndb_err(m_active_trans));
+ }
- if (start_key &&
- set_bounds(op, start_key,
- (start_key->flag == HA_READ_KEY_EXACT) ?
- NdbIndexScanOperation::BoundEQ :
- (start_key->flag == HA_READ_AFTER_KEY) ?
- NdbIndexScanOperation::BoundLT :
- NdbIndexScanOperation::BoundLE))
- DBUG_RETURN(1);
+ {
+ const key_range *keys[2]= { start_key, end_key };
+ int ret= set_bounds(op, keys);
+ if (ret)
+ DBUG_RETURN(ret);
+ }
- if (end_key)
+ if (!restart)
{
- if (start_key && start_key->flag == HA_READ_KEY_EXACT)
- {
- DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key"));
- }
- else if (set_bounds(op, end_key,
- (end_key->flag == HA_READ_AFTER_KEY) ?
- NdbIndexScanOperation::BoundGE :
- NdbIndexScanOperation::BoundGT))
- DBUG_RETURN(1);
+ DBUG_RETURN(define_read_attrs(buf, op));
+ }
+ else
+ {
+ if (execute_no_commit(this,trans) != 0)
+ DBUG_RETURN(ndb_err(trans));
+
+ DBUG_RETURN(next_result(buf));
}
- DBUG_RETURN(define_read_attrs(buf, op));
}
-
/*
Start a filtered scan in NDB.
@@ -1227,11 +1540,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname));
- if (!(op= trans->getNdbScanOperation((NDBTAB *) m_table)))
- ERR_RETURN(trans->getNdbError());
- NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
- get_ndb_lock_type(m_lock.type);
- if (!(cursor= op->readTuples(lm, 0, parallelism)))
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
+ !(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
@@ -1298,68 +1610,18 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
- if (!(op=trans->getNdbScanOperation((NDBTAB *) m_table)))
- ERR_RETURN(trans->getNdbError());
- NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
- get_ndb_lock_type(m_lock.type);
- if (!(cursor= op->readTuples(lm, 0, parallelism)))
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
+ !(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
}
-
-inline
-int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
-{
- uint i;
- THD *thd= current_thd;
- NdbConnection *trans= m_active_trans;
-
- DBUG_ENTER("define_read_attrs");
-
- // Define attributes to read
- for (i= 0; i < table->fields; i++)
- {
- Field *field= table->field[i];
- if ((thd->query_id == field->query_id) ||
- (field->flags & PRI_KEY_FLAG) ||
- retrieve_all_fields)
- {
- if (get_ndb_value(op, field, i))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- m_value[i].ptr= NULL;
- }
- }
-
- if (table->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Getting hidden key"));
- // Scanning table with no primary key
- int hidden_no= table->fields;
-#ifndef DBUG_OFF
- const NDBTAB *tab= (NDBTAB *) m_table;
- if (!tab->getColumn(hidden_no))
- DBUG_RETURN(1);
-#endif
- if (get_ndb_value(op, NULL, hidden_no))
- ERR_RETURN(op->getNdbError());
- }
-
- if (trans->execute(NoCommit) != 0)
- DBUG_RETURN(ndb_err(trans));
- DBUG_PRINT("exit", ("Scan started successfully"));
- DBUG_RETURN(next_result(buf));
-}
-
-
/*
Insert one record into NDB
*/
-
int ha_ndbcluster::write_row(byte *record)
{
bool has_auto_increment;
@@ -1370,14 +1632,18 @@ int ha_ndbcluster::write_row(byte *record)
THD *thd= current_thd;
DBUG_ENTER("write_row");
+
+ if(m_ignore_dup_key_not_supported)
+ {
+ DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+ }
statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
- if (table->timestamp_default_now)
- update_timestamp(record+table->timestamp_default_now-1);
+ if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
+ table->timestamp_field->set_time();
has_auto_increment= (table->next_number_field && record == table->record[0]);
- skip_auto_increment= table->auto_increment_field_not_null;
- if (!(op= trans->getNdbOperation((NDBTAB *) m_table)))
+ if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
ERR_RETURN(trans->getNdbError());
res= (m_use_write) ? op->writeTuple() :op->insertTuple();
@@ -1387,7 +1653,7 @@ int ha_ndbcluster::write_row(byte *record)
if (table->primary_key == MAX_KEY)
{
// Table has hidden primary key
- Uint64 auto_value= m_ndb->getAutoIncrementValue((NDBTAB *) m_table);
+ Uint64 auto_value= m_ndb->getAutoIncrementValue((const NDBTAB *) m_table);
if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
ERR_RETURN(op->getNdbError());
}
@@ -1395,21 +1661,26 @@ int ha_ndbcluster::write_row(byte *record)
{
int res;
- if ((has_auto_increment) && (!skip_auto_increment))
+ if (has_auto_increment)
+ {
+ skip_auto_increment= FALSE;
update_auto_increment();
+ skip_auto_increment= !auto_increment_column_changed;
+ }
if ((res= set_primary_key(op)))
return res;
}
// Set non-key attribute(s)
+ bool set_blob_value= FALSE;
for (i= 0; i < table->fields; i++)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
- set_ndb_value(op, field, i))
+ set_ndb_value(op, field, i, &set_blob_value))
{
- skip_auto_increment= true;
+ skip_auto_increment= TRUE;
ERR_RETURN(op->getNdbError());
}
}
@@ -1422,20 +1693,38 @@ int ha_ndbcluster::write_row(byte *record)
Find out how this is detected!
*/
rows_inserted++;
- bulk_insert_not_flushed= true;
+ no_uncommitted_rows_update(1);
+ bulk_insert_not_flushed= TRUE;
if ((rows_to_insert == 1) ||
((rows_inserted % bulk_insert_rows) == 0) ||
- uses_blob_value(false) != 0)
+ set_blob_value)
{
+ THD *thd= current_thd;
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
- (int)rows_inserted, (int)bulk_insert_rows));
- bulk_insert_not_flushed= false;
- if (trans->execute(NoCommit) != 0)
+ (int)rows_inserted, (int)bulk_insert_rows));
+
+ bulk_insert_not_flushed= FALSE;
+ if (thd->transaction.on)
{
- skip_auto_increment= true;
- DBUG_RETURN(ndb_err(trans));
+ if (execute_no_commit(this,trans) != 0)
+ {
+ skip_auto_increment= TRUE;
+ no_uncommitted_rows_execute_failure();
+ DBUG_RETURN(ndb_err(trans));
+ }
+ }
+ else
+ {
+ if (execute_commit(this,trans) != 0)
+ {
+ skip_auto_increment= TRUE;
+ no_uncommitted_rows_execute_failure();
+ DBUG_RETURN(ndb_err(trans));
+ }
+ int res= trans->restart();
+ DBUG_ASSERT(res == 0);
}
}
if ((has_auto_increment) && (skip_auto_increment))
@@ -1444,11 +1733,11 @@ int ha_ndbcluster::write_row(byte *record)
DBUG_PRINT("info",
("Trying to set next auto increment value to %lu",
(ulong) next_val));
- if (m_ndb->setAutoIncrementValue((NDBTAB *) m_table, next_val, true))
+ if (m_ndb->setAutoIncrementValue((const NDBTAB *) m_table, next_val, TRUE))
DBUG_PRINT("info",
("Setting next auto increment value to %u", next_val));
}
- skip_auto_increment= true;
+ skip_auto_increment= TRUE;
DBUG_RETURN(0);
}
@@ -1502,9 +1791,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
DBUG_ENTER("update_row");
statistic_increment(thd->status_var.ha_update_count, &LOCK_status);
- if (table->timestamp_on_update_now)
- update_timestamp(new_data+table->timestamp_on_update_now-1);
-
+ if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
+ table->timestamp_field->set_time();
+
/* Check for update of primary key for special handling */
if ((table->primary_key != MAX_KEY) &&
(key_cmp(table->primary_key, old_data, new_data)))
@@ -1552,12 +1841,12 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
ops_pending++;
- if (uses_blob_value(false))
- blobs_pending= true;
+ if (uses_blob_value(FALSE))
+ blobs_pending= TRUE;
}
else
{
- if (!(op= trans->getNdbOperation((NDBTAB *) m_table)) ||
+ if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->updateTuple() != 0)
ERR_RETURN(trans->getNdbError());
@@ -1595,8 +1884,10 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
// Execute update operation
- if (!cursor && trans->execute(NoCommit) != 0)
+ if (!cursor && execute_no_commit(this,trans) != 0) {
+ no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
+ }
DBUG_RETURN(0);
}
@@ -1630,16 +1921,20 @@ int ha_ndbcluster::delete_row(const byte *record)
ERR_RETURN(trans->getNdbError());
ops_pending++;
+ no_uncommitted_rows_update(-1);
+
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0);
}
else
{
- if (!(op=trans->getNdbOperation((NDBTAB *) m_table)) ||
+ if (!(op=trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->deleteTuple() != 0)
ERR_RETURN(trans->getNdbError());
+ no_uncommitted_rows_update(-1);
+
if (table->primary_key == MAX_KEY)
{
// This table has no primary key, use "hidden" primary key
@@ -1660,8 +1955,10 @@ int ha_ndbcluster::delete_row(const byte *record)
}
// Execute delete operation
- if (trans->execute(NoCommit) != 0)
+ if (execute_no_commit(this,trans) != 0) {
+ no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
+ }
DBUG_RETURN(0);
}
@@ -1702,7 +1999,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
else
{
NdbBlob* ndb_blob= (*value).blob;
- bool isNull= true;
+ bool isNull= TRUE;
int ret= ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
@@ -1717,7 +2014,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
{
// Table with hidden primary key
int hidden_no= table->fields;
- const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
@@ -1735,7 +2032,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
void ha_ndbcluster::print_results()
{
- const NDBTAB *tab= (NDBTAB*) m_table;
+ const NDBTAB *tab= (const NDBTAB*) m_table;
DBUG_ENTER("print_results");
#ifndef DBUG_OFF
@@ -1770,7 +2067,7 @@ void ha_ndbcluster::print_results()
else
{
ndb_blob= value.blob;
- bool isNull= true;
+ bool isNull= TRUE;
ndb_blob->getNull(isNull);
if (isNull) {
fprintf(DBUG_FILE, "NULL\n");
@@ -1910,18 +2207,54 @@ int ha_ndbcluster::index_end()
int ha_ndbcluster::index_read(byte *buf,
- const byte *key, uint key_len,
- enum ha_rkey_function find_flag)
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
{
DBUG_ENTER("index_read");
DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
active_index, key_len, find_flag));
+ int error;
+ ndb_index_type type = get_index_type(active_index);
+ const KEY* key_info = table->key_info+active_index;
+ switch (type){
+ case PRIMARY_KEY_ORDERED_INDEX:
+ case PRIMARY_KEY_INDEX:
+ if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
+ {
+ DBUG_RETURN(pk_read(key, key_len, buf));
+ }
+ else if (type == PRIMARY_KEY_INDEX)
+ {
+ DBUG_RETURN(1);
+ }
+ break;
+ case UNIQUE_ORDERED_INDEX:
+ case UNIQUE_INDEX:
+ if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
+ {
+ DBUG_RETURN(unique_index_read(key, key_len, buf));
+ }
+ else if (type == UNIQUE_INDEX)
+ {
+ DBUG_RETURN(1);
+ }
+ break;
+ case ORDERED_INDEX:
+ break;
+ default:
+ case UNDEFINED_INDEX:
+ DBUG_ASSERT(FALSE);
+ DBUG_RETURN(1);
+ break;
+ }
+
key_range start_key;
- start_key.key= key;
- start_key.length= key_len;
- start_key.flag= find_flag;
- DBUG_RETURN(read_range_first(&start_key, NULL, false, true));
+ start_key.key = key;
+ start_key.length = key_len;
+ start_key.flag = find_flag;
+ error= ordered_index_scan(&start_key, 0, TRUE, buf);
+ DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error);
}
@@ -1962,7 +2295,10 @@ int ha_ndbcluster::index_first(byte *buf)
DBUG_ENTER("index_first");
statistic_increment(current_thd->status_var.ha_read_first_count,
&LOCK_status);
- DBUG_RETURN(1);
+ // Start the ordered index scan and fetch the first row
+
+ // Only HA_READ_ORDER indexes get called by index_first
+ DBUG_RETURN(ordered_index_scan(0, 0, TRUE, buf));
}
@@ -1970,23 +2306,31 @@ int ha_ndbcluster::index_last(byte *buf)
{
DBUG_ENTER("index_last");
statistic_increment(current_thd->status_var.ha_read_last_count,&LOCK_status);
+ int res;
+ if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
+ NdbResultSet *cursor= m_active_cursor;
+ while((res= cursor->nextResult(TRUE)) == 0);
+ if(res == 1){
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+ }
+ }
DBUG_RETURN(1);
}
-int ha_ndbcluster::read_range_first(const key_range *start_key,
- const key_range *end_key,
- bool eq_range, bool sorted)
+inline
+int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
+ const key_range *end_key,
+ bool eq_range, bool sorted,
+ byte* buf)
{
KEY* key_info;
int error= 1;
- byte* buf= table->record[0];
- DBUG_ENTER("ha_ndbcluster::read_range_first");
+ DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
- if (m_active_cursor)
- close_scan();
-
switch (get_index_type(active_index)){
case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX:
@@ -2014,15 +2358,26 @@ int ha_ndbcluster::read_range_first(const key_range *start_key,
break;
}
-
// Start the ordered index scan and fetch the first row
- error= ordered_index_scan(start_key, end_key, sorted,
- buf);
-
+ error= ordered_index_scan(start_key, end_key, sorted, buf);
DBUG_RETURN(error);
}
+int ha_ndbcluster::read_range_first(const key_range *start_key,
+ const key_range *end_key,
+ bool eq_range, bool sorted)
+{
+ byte* buf= table->record[0];
+ DBUG_ENTER("ha_ndbcluster::read_range_first");
+
+ DBUG_RETURN(read_range_first_to_buf(start_key,
+ end_key,
+ eq_range,
+ sorted,
+ buf));
+}
+
int ha_ndbcluster::read_range_next()
{
DBUG_ENTER("ha_ndbcluster::read_range_next");
@@ -2040,7 +2395,8 @@ int ha_ndbcluster::rnd_init(bool scan)
{
if (!scan)
DBUG_RETURN(1);
- cursor->restart();
+ int res= cursor->restart();
+ DBUG_ASSERT(res == 0);
}
index_init(table->primary_key);
DBUG_RETURN(0);
@@ -2063,8 +2419,10 @@ int ha_ndbcluster::close_scan()
deleteing/updating transaction before closing the scan
*/
DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
- if (trans->execute(NoCommit) != 0)
+ if (execute_no_commit(this,trans) != 0) {
+ no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
+ }
ops_pending= 0;
}
@@ -2152,7 +2510,7 @@ void ha_ndbcluster::position(const byte *record)
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields;
NdbRecAttr* rec= m_value[hidden_no].rec;
- const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() &&
hidden_col->getAutoIncrement() &&
@@ -2177,10 +2535,26 @@ void ha_ndbcluster::info(uint flag)
DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
if (flag & HA_STATUS_TIME)
DBUG_PRINT("info", ("HA_STATUS_TIME"));
- if (flag & HA_STATUS_CONST)
- DBUG_PRINT("info", ("HA_STATUS_CONST"));
if (flag & HA_STATUS_VARIABLE)
+ {
DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
+ if (m_table_info)
+ {
+ records_update();
+ }
+ else
+ {
+ Uint64 rows;
+ if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
+ records= rows;
+ }
+ }
+ }
+ if (flag & HA_STATUS_CONST)
+ {
+ DBUG_PRINT("info", ("HA_STATUS_CONST"));
+ set_rec_per_key();
+ }
if (flag & HA_STATUS_ERRKEY)
{
DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
@@ -2273,14 +2647,20 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
break;
case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
-
- DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
- m_use_write= TRUE;
+ if (current_thd->lex->sql_command == SQLCOM_REPLACE)
+ {
+ DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
+ m_use_write= TRUE;
+ } else
+ {
+ m_ignore_dup_key_not_supported= TRUE;
+ }
break;
case HA_EXTRA_NO_IGNORE_DUP_KEY:
DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
- m_use_write= false;
+ m_use_write= FALSE;
+ m_ignore_dup_key_not_supported= FALSE;
break;
case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
where field->query_id is the same as
@@ -2327,7 +2707,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
void ha_ndbcluster::start_bulk_insert(ha_rows rows)
{
int bytes, batch;
- const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBTAB *tab= (const NDBTAB *) m_table;
DBUG_ENTER("start_bulk_insert");
DBUG_PRINT("enter", ("rows: %d", (int)rows));
@@ -2368,9 +2748,11 @@ int ha_ndbcluster::end_bulk_insert()
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
rows_inserted, bulk_insert_rows));
- bulk_insert_not_flushed= false;
- if (trans->execute(NoCommit) != 0)
- error= ndb_err(trans);
+ bulk_insert_not_flushed= FALSE;
+ if (execute_no_commit(this,trans) != 0) {
+ no_uncommitted_rows_execute_failure();
+ my_errno= error= ndb_err(trans);
+ }
}
rows_inserted= 0;
@@ -2396,7 +2778,7 @@ int ha_ndbcluster::reset()
const char **ha_ndbcluster::bas_ext() const
-{ static const char *ext[1]= { NullS }; return ext; }
+{ static const char *ext[]= { ".ndb", NullS }; return ext; }
/*
@@ -2407,7 +2789,11 @@ const char **ha_ndbcluster::bas_ext() const
double ha_ndbcluster::scan_time()
{
- return rows2double(records*1000);
+ DBUG_ENTER("ha_ndbcluster::scan_time()");
+ double res= rows2double(records*1000);
+ DBUG_PRINT("exit", ("table: %s value: %f",
+ m_tabname, res));
+ DBUG_RETURN(res);
}
@@ -2416,14 +2802,16 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
enum thr_lock_type lock_type)
{
DBUG_ENTER("store_lock");
-
if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK)
{
-
+
/* If we are not doing a LOCK TABLE, then allow multiple
writers */
- if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+ /* Since NDB does not currently have table locks
+ this is treated as a ordinary lock */
+
+ if ((lock_type >= TL_WRITE_ALLOW_WRITE &&
lock_type <= TL_WRITE) && !thd->in_lock_tables)
lock_type= TL_WRITE_ALLOW_WRITE;
@@ -2477,9 +2865,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
NdbConnection* trans= NULL;
DBUG_ENTER("external_lock");
- DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
- thd->transaction.ndb_lock_count));
-
/*
Check that this handler instance has a connection
set up to the Ndb object of thd
@@ -2487,10 +2872,15 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (check_ndb_connection())
DBUG_RETURN(1);
+ Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
+
+ DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d",
+ thd_ndb->lock_count));
+
if (lock_type != F_UNLCK)
{
DBUG_PRINT("info", ("lock_type != F_UNLCK"));
- if (!thd->transaction.ndb_lock_count++)
+ if (!thd_ndb->lock_count++)
{
PRINT_OPTION_FLAGS(thd);
@@ -2502,10 +2892,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
- {
- thd->transaction.ndb_lock_count--; // We didn't get the lock
ERR_RETURN(m_ndb->getNdbError());
- }
+ no_uncommitted_rows_reset(thd);
thd->transaction.stmt.ndb_tid= trans;
}
else
@@ -2518,10 +2906,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction();
if (trans == NULL)
- {
- thd->transaction.ndb_lock_count--; // We didn't get the lock
ERR_RETURN(m_ndb->getNdbError());
- }
+ no_uncommitted_rows_reset(thd);
/*
If this is the start of a LOCK TABLE, a table look
@@ -2555,15 +2941,25 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
(NdbConnection*)thd->transaction.all.ndb_tid:
(NdbConnection*)thd->transaction.stmt.ndb_tid;
DBUG_ASSERT(m_active_trans);
-
// Start of transaction
retrieve_all_fields= FALSE;
ops_pending= 0;
+ {
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *tab;
+ void *tab_info;
+ if (!(tab= dict->getTable(m_tabname, &tab_info)))
+ ERR_RETURN(dict->getNdbError());
+ DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
+ m_table= (void *)tab;
+ m_table_info= tab_info;
+ }
+ no_uncommitted_rows_init(thd);
}
else
{
DBUG_PRINT("info", ("lock_type == F_UNLCK"));
- if (!--thd->transaction.ndb_lock_count)
+ if (!--thd_ndb->lock_count)
{
DBUG_PRINT("trans", ("Last external_lock"));
PRINT_OPTION_FLAGS(thd);
@@ -2580,7 +2976,28 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
thd->transaction.stmt.ndb_tid= 0;
}
}
- m_active_trans= NULL;
+ m_table= NULL;
+ m_table_info= NULL;
+ /*
+ This is the place to make sure this handler instance
+ no longer are connected to the active transaction.
+
+ And since the handler is no longer part of the transaction
+ it can't have open cursors, ops or blobs pending.
+ */
+ m_active_trans= NULL;
+
+ if (m_active_cursor)
+ DBUG_PRINT("warning", ("m_active_cursor != NULL"));
+ m_active_cursor= NULL;
+
+ if (blobs_pending)
+ DBUG_PRINT("warning", ("blobs_pending != 0"));
+ blobs_pending= 0;
+
+ if (ops_pending)
+ DBUG_PRINT("warning", ("ops_pending != 0L"));
+ ops_pending= 0;
}
DBUG_RETURN(error);
}
@@ -2589,6 +3006,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
When using LOCK TABLE's external_lock is only called when the actual
TABLE LOCK is done.
Under LOCK TABLES, each used tables will force a call to start_stmt.
+ Ndb doesn't currently support table locks, and will do ordinary
+ startTransaction for each transaction/statement.
*/
int ha_ndbcluster::start_stmt(THD *thd)
@@ -2604,9 +3023,12 @@ int ha_ndbcluster::start_stmt(THD *thd)
NdbConnection *tablock_trans=
(NdbConnection*)thd->transaction.all.ndb_tid;
DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
- DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
+ DBUG_ASSERT(tablock_trans);
+// trans= m_ndb->hupp(tablock_trans);
+ trans= m_ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
+ no_uncommitted_rows_reset(thd);
thd->transaction.stmt.ndb_tid= trans;
}
m_active_trans= trans;
@@ -2626,7 +3048,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
int ndbcluster_commit(THD *thd, void *ndb_transaction)
{
int res= 0;
- Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
NdbConnection *trans= (NdbConnection*)ndb_transaction;
DBUG_ENTER("ndbcluster_commit");
@@ -2635,7 +3057,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
"stmt" : "all"));
DBUG_ASSERT(ndb && trans);
- if (trans->execute(Commit) != 0)
+ if (execute_commit(0,trans) != 0)
{
const NdbError err= trans->getNdbError();
const NdbOperation *error_op= trans->getNdbErrorOperation();
@@ -2644,7 +3066,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
if (res != -1)
ndbcluster_print_error(res, error_op);
}
- ndb->closeTransaction(trans);
+ ndb->closeTransaction(trans);
DBUG_RETURN(res);
}
@@ -2656,7 +3078,7 @@ int ndbcluster_commit(THD *thd, void *ndb_transaction)
int ndbcluster_rollback(THD *thd, void *ndb_transaction)
{
int res= 0;
- Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
NdbConnection *trans= (NdbConnection*)ndb_transaction;
DBUG_ENTER("ndbcluster_rollback");
@@ -2691,6 +3113,8 @@ static int create_ndb_column(NDBCOL &col,
{
// Set name
col.setName(field->field_name);
+ // Get char set
+ CHARSET_INFO *cs= field->charset();
// Set type and sizes
const enum enum_field_types mysql_type= field->real_type();
switch (mysql_type) {
@@ -2762,15 +3186,22 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Binary);
- else
+ else {
col.setType(NDBCOL::Char);
- col.setLength(field->pack_length());
+ col.setCharset(cs);
+ }
+ if (field->pack_length() == 0)
+ col.setLength(1); // currently ndb does not support size 0
+ else
+ col.setLength(field->pack_length());
break;
case MYSQL_TYPE_VAR_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Varbinary);
- else
+ else {
col.setType(NDBCOL::Varchar);
+ col.setCharset(cs);
+ }
col.setLength(field->pack_length());
break;
// Blob types (all come in as MYSQL_TYPE_BLOB)
@@ -2778,8 +3209,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_TINY_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
- else
+ else {
col.setType(NDBCOL::Text);
+ col.setCharset(cs);
+ }
col.setInlineSize(256);
// No parts
col.setPartSize(0);
@@ -2789,8 +3222,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
- else
+ else {
col.setType(NDBCOL::Text);
+ col.setCharset(cs);
+ }
// Use "<=" even if "<" is the exact condition
if (field->max_length() <= (1 << 8))
goto mysql_type_tiny_blob;
@@ -2809,8 +3244,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_MEDIUM_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
- else
+ else {
col.setType(NDBCOL::Text);
+ col.setCharset(cs);
+ }
col.setInlineSize(256);
col.setPartSize(4000);
col.setStripeSize(8);
@@ -2819,8 +3256,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_LONG_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
- else
+ else {
col.setType(NDBCOL::Text);
+ col.setCharset(cs);
+ }
col.setInlineSize(256);
col.setPartSize(8000);
col.setStripeSize(4);
@@ -2849,12 +3288,12 @@ static int create_ndb_column(NDBCOL &col,
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
- info->auto_increment_value -1 : (ulonglong) 0;
+ info->auto_increment_value : (ulonglong) 1;
DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
col.setAutoIncrementInitialValue(value);
}
else
- col.setAutoIncrement(false);
+ col.setAutoIncrement(FALSE);
return 0;
}
@@ -2872,12 +3311,24 @@ int ha_ndbcluster::create(const char *name,
const void *data, *pack_data;
const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN];
+ bool create_from_engine= (info->table_options & HA_CREATE_FROM_ENGINE);
DBUG_ENTER("create");
DBUG_PRINT("enter", ("name: %s", name));
fn_format(name2, name, "", "",2); // Remove the .frm extension
set_dbname(name2);
- set_tabname(name2);
+ set_tabname(name2);
+
+ if (create_from_engine)
+ {
+ /*
+ Table alreay exists in NDB and frm file has been created by
+ caller.
+ Do Ndb specific stuff, such as create a .ndb file
+ */
+ my_errno= write_ndb_file();
+ DBUG_RETURN(my_errno);
+ }
DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname);
@@ -2912,22 +3363,18 @@ int ha_ndbcluster::create(const char *name,
col.setName("$PK");
col.setType(NdbDictionary::Column::Bigunsigned);
col.setLength(1);
- col.setNullable(false);
+ col.setNullable(FALSE);
col.setPrimaryKey(TRUE);
col.setAutoIncrement(TRUE);
tab.addColumn(col);
}
- my_errno= 0;
- if (check_ndb_connection())
- {
- my_errno= HA_ERR_NO_CONNECTION;
+ if ((my_errno= check_ndb_connection()))
DBUG_RETURN(my_errno);
- }
// Create the table in NDB
NDBDICT *dict= m_ndb->getDictionary();
- if (dict->createTable(tab))
+ if (dict->createTable(tab) != 0)
{
const NdbError err= dict->getNdbError();
ERR_PRINT(err);
@@ -2940,6 +3387,9 @@ int ha_ndbcluster::create(const char *name,
// Create secondary indexes
my_errno= build_index_list(form, ILBP_CREATE);
+ if (!my_errno)
+ my_errno= write_ndb_file();
+
DBUG_RETURN(my_errno);
}
@@ -2948,7 +3398,7 @@ int ha_ndbcluster::create_ordered_index(const char *name,
KEY *key_info)
{
DBUG_ENTER("create_ordered_index");
- DBUG_RETURN(create_index(name, key_info, false));
+ DBUG_RETURN(create_index(name, key_info, FALSE));
}
int ha_ndbcluster::create_unique_index(const char *name,
@@ -2956,7 +3406,7 @@ int ha_ndbcluster::create_unique_index(const char *name,
{
DBUG_ENTER("create_unique_index");
- DBUG_RETURN(create_index(name, key_info, true));
+ DBUG_RETURN(create_index(name, key_info, TRUE));
}
@@ -2975,7 +3425,6 @@ int ha_ndbcluster::create_index(const char *name,
DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name));
- // NdbDictionary::Index ndb_index(name);
NdbDictionary::Index ndb_index(name);
if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
@@ -2983,7 +3432,7 @@ int ha_ndbcluster::create_index(const char *name,
{
ndb_index.setType(NdbDictionary::Index::OrderedIndex);
// TODO Only temporary ordered indexes supported
- ndb_index.setLogging(false);
+ ndb_index.setLogging(FALSE);
}
ndb_index.setTable(m_tabname);
@@ -3016,14 +3465,16 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
set_tabname(from);
set_tabname(to, new_tabname);
- if (check_ndb_connection()) {
- my_errno= HA_ERR_NO_CONNECTION;
- DBUG_RETURN(my_errno);
- }
+ if (check_ndb_connection())
+ DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
+
int result= alter_table_name(m_tabname, new_tabname);
if (result == 0)
+ {
set_tabname(to);
+ handler::rename_table(from, to);
+ }
DBUG_RETURN(result);
}
@@ -3049,6 +3500,7 @@ int ha_ndbcluster::alter_table_name(const char *from, const char *to)
ERR_RETURN(dict->getNdbError());
m_table= NULL;
+ m_table_info= NULL;
DBUG_RETURN(0);
}
@@ -3067,6 +3519,8 @@ int ha_ndbcluster::delete_table(const char *name)
if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ handler::delete_table(name);
DBUG_RETURN(drop_table());
}
@@ -3113,11 +3567,12 @@ ulonglong ha_ndbcluster::get_auto_increment()
Uint64 auto_value;
DBUG_ENTER("get_auto_increment");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
- cache_size= ((rows_to_insert > autoincrement_prefetch) ?
- rows_to_insert : autoincrement_prefetch);
+ cache_size= ((rows_to_insert - rows_inserted < autoincrement_prefetch) ?
+ rows_to_insert - rows_inserted :
+ max(rows_to_insert, autoincrement_prefetch));
auto_value= ((skip_auto_increment) ?
- m_ndb->readAutoIncrementValue((NDBTAB *) m_table) :
- m_ndb->getAutoIncrementValue((NDBTAB *) m_table, cache_size));
+ m_ndb->readAutoIncrementValue((const NDBTAB *) m_table) :
+ m_ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size));
DBUG_RETURN((ulonglong) auto_value);
}
@@ -3132,18 +3587,22 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_active_cursor(NULL),
m_ndb(NULL),
m_table(NULL),
+ m_table_info(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY |
- HA_NOT_EXACT_COUNT |
- HA_NO_PREFIX_CHAR_KEYS),
- m_use_write(false),
+ HA_AUTO_PART_KEY |
+ HA_NO_PREFIX_CHAR_KEYS),
+ m_share(0),
+ m_use_write(FALSE),
+ m_ignore_dup_key_not_supported(FALSE),
retrieve_all_fields(FALSE),
rows_to_insert(1),
rows_inserted(0),
bulk_insert_rows(1024),
- bulk_insert_not_flushed(false),
+ bulk_insert_not_flushed(FALSE),
ops_pending(0),
- skip_auto_increment(true),
+ skip_auto_increment(TRUE),
+ blobs_pending(0),
blobs_buffer(0),
blobs_buffer_size(0),
dupkey((uint) -1)
@@ -3155,15 +3614,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_tabname[0]= '\0';
m_dbname[0]= '\0';
- // TODO Adjust number of records and other parameters for proper
- // selection of scan/pk access
- records= 100;
+ records= ~(ha_rows)0; // uninitialized
block_size= 1024;
for (i= 0; i < MAX_KEY; i++)
{
m_index[i].type= UNDEFINED_INDEX;
- m_index[i].unique_name= NULL;
m_index[i].unique_index= NULL;
m_index[i].index= NULL;
}
@@ -3180,12 +3636,18 @@ ha_ndbcluster::~ha_ndbcluster()
{
DBUG_ENTER("~ha_ndbcluster");
+ if (m_share)
+ free_share(m_share);
release_metadata();
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer= 0;
// Check for open cursor/transaction
+ if (m_active_cursor) {
+ }
DBUG_ASSERT(m_active_cursor == NULL);
+ if (m_active_trans) {
+ }
DBUG_ASSERT(m_active_trans == NULL);
DBUG_VOID_RETURN;
@@ -3200,6 +3662,7 @@ ha_ndbcluster::~ha_ndbcluster()
int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
{
+ int res;
KEY *key;
DBUG_ENTER("open");
DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
@@ -3222,10 +3685,16 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
set_dbname(name);
set_tabname(name);
- if (check_ndb_connection())
+ if (check_ndb_connection()) {
+ free_share(m_share); m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ }
+
+ res= get_metadata(name);
+ if (!res)
+ info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
- DBUG_RETURN(get_metadata(name));
+ DBUG_RETURN(res);
}
@@ -3237,89 +3706,87 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int ha_ndbcluster::close(void)
{
DBUG_ENTER("close");
- free_share(m_share);
+ free_share(m_share); m_share= 0;
release_metadata();
m_ndb= NULL;
DBUG_RETURN(0);
}
-Ndb* ha_ndbcluster::seize_ndb()
+Thd_ndb* ha_ndbcluster::seize_thd_ndb()
{
- Ndb* ndb;
- DBUG_ENTER("seize_ndb");
+ Thd_ndb *thd_ndb;
+ DBUG_ENTER("seize_thd_ndb");
-#ifdef USE_NDB_POOL
- // Seize from pool
- ndb= Ndb::seize();
-#else
- ndb= new Ndb("");
-#endif
- if (ndb->init(max_transactions) != 0)
+ thd_ndb= new Thd_ndb();
+ thd_ndb->ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
+ if (thd_ndb->ndb->init(max_transactions) != 0)
{
- ERR_PRINT(ndb->getNdbError());
+ ERR_PRINT(thd_ndb->ndb->getNdbError());
/*
TODO
Alt.1 If init fails because to many allocated Ndb
wait on condition for a Ndb object to be released.
Alt.2 Seize/release from pool, wait until next release
*/
- delete ndb;
- ndb= NULL;
+ delete thd_ndb;
+ thd_ndb= NULL;
}
- DBUG_RETURN(ndb);
+ DBUG_RETURN(thd_ndb);
}
-void ha_ndbcluster::release_ndb(Ndb* ndb)
+void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
{
- DBUG_ENTER("release_ndb");
-#ifdef USE_NDB_POOL
- // Release to pool
- Ndb::release(ndb);
-#else
- delete ndb;
-#endif
+ DBUG_ENTER("release_thd_ndb");
+ delete thd_ndb;
DBUG_VOID_RETURN;
}
/*
- If this thread already has a Ndb object allocated
+ If this thread already has a Thd_ndb object allocated
in current THD, reuse it. Otherwise
- seize a Ndb object, assign it to current THD and use it.
-
- Having a Ndb object also means that a connection to
- NDB cluster has been opened. The connection is
- checked.
+ seize a Thd_ndb object, assign it to current THD and use it.
*/
+Ndb* check_ndb_in_thd(THD* thd)
+{
+ DBUG_ENTER("check_ndb_in_thd");
+ Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
+
+ if (!thd_ndb)
+ {
+ if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+ DBUG_RETURN(NULL);
+ thd->transaction.thd_ndb= thd_ndb;
+ }
+ DBUG_RETURN(thd_ndb->ndb);
+}
+
+
int ha_ndbcluster::check_ndb_connection()
{
THD* thd= current_thd;
- Ndb* ndb;
DBUG_ENTER("check_ndb_connection");
- if (!thd->transaction.ndb)
- {
- ndb= seize_ndb();
- if (!ndb)
- DBUG_RETURN(2);
- thd->transaction.ndb= ndb;
- }
- m_ndb= (Ndb*)thd->transaction.ndb;
+ if (!(m_ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
m_ndb->setDatabaseName(m_dbname);
DBUG_RETURN(0);
}
+
void ndbcluster_close_connection(THD *thd)
{
- Ndb* ndb;
+ Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
DBUG_ENTER("ndbcluster_close_connection");
- ndb= (Ndb*)thd->transaction.ndb;
- ha_ndbcluster::release_ndb(ndb);
- thd->transaction.ndb= NULL;
+ if (thd_ndb)
+ {
+ ha_ndbcluster::release_thd_ndb(thd_ndb);
+ thd->transaction.thd_ndb= NULL;
+ }
DBUG_VOID_RETURN;
}
@@ -3328,23 +3795,29 @@ void ndbcluster_close_connection(THD *thd)
Try to discover one table from NDB
*/
-int ndbcluster_discover(const char *dbname, const char *name,
+int ndbcluster_discover(THD* thd, const char *db, const char *name,
const void** frmblob, uint* frmlen)
{
uint len;
const void* data;
const NDBTAB* tab;
+ Ndb* ndb;
DBUG_ENTER("ndbcluster_discover");
- DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+ DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
- Ndb ndb(dbname);
- if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0))
- ERR_RETURN(ndb.getNdbError());
-
- if (!(tab= ndb.getDictionary()->getTable(name)))
- {
- DBUG_PRINT("info", ("Table %s not found", name));
- DBUG_RETURN(1);
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ ndb->setDatabaseName(db);
+
+ NDBDICT* dict= ndb->getDictionary();
+ dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
+ dict->invalidateTable(name);
+ if (!(tab= dict->getTable(name)))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ DBUG_RETURN(1);
+ ERR_RETURN(err);
}
DBUG_PRINT("info", ("Found table %s", tab->getName()));
@@ -3366,41 +3839,202 @@ int ndbcluster_discover(const char *dbname, const char *name,
DBUG_RETURN(0);
}
-
-#ifdef USE_DISCOVER_ON_STARTUP
/*
- Dicover tables from NDB Cluster
- - fetch a list of tables from NDB
- - store the frm file for each table on disk
- - if the table has an attached frm file
- - if the database of the table exists
-*/
+ Check if a table exists in NDB
+
+ */
-int ndb_discover_tables()
+int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
{
+ uint len;
+ const void* data;
+ const NDBTAB* tab;
+ Ndb* ndb;
+ DBUG_ENTER("ndbcluster_table_exists");
+ DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ ndb->setDatabaseName(db);
+
+ NDBDICT* dict= ndb->getDictionary();
+ dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
+ dict->invalidateTable(name);
+ if (!(tab= dict->getTable(name)))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ DBUG_RETURN(0);
+ ERR_RETURN(err);
+ }
+
+ DBUG_PRINT("info", ("Found table %s", tab->getName()));
+ DBUG_RETURN(1);
+}
+
+
+
+extern "C" byte* tables_get_key(const char *entry, uint *length,
+ my_bool not_used __attribute__((unused)))
+{
+ *length= strlen(entry);
+ return (byte*) entry;
+}
+
+
+int ndbcluster_find_files(THD *thd,const char *db,const char *path,
+ const char *wild, bool dir, List<char> *files)
+{
+ DBUG_ENTER("ndbcluster_find_files");
+ DBUG_PRINT("enter", ("db: %s", db));
+ { // extra bracket to avoid gcc 2.95.3 warning
uint i;
+ Ndb* ndb;
+ char name[FN_REFLEN];
+ HASH ndb_tables, ok_tables;
NdbDictionary::Dictionary::List list;
- NdbDictionary::Dictionary* dict;
- char path[FN_REFLEN];
- DBUG_ENTER("ndb_discover_tables");
-
- /* List tables in NDB Cluster kernel */
- dict= g_ndb->getDictionary();
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ if (dir)
+ DBUG_RETURN(0); // Discover of databases not yet supported
+
+ // List tables in NDB
+ NDBDICT *dict= ndb->getDictionary();
if (dict->listObjects(list,
NdbDictionary::Object::UserTable) != 0)
- ERR_RETURN(g_ndb->getNdbError());
-
+ ERR_RETURN(dict->getNdbError());
+
+ if (hash_init(&ndb_tables, system_charset_info,list.count,0,0,
+ (hash_get_key)tables_get_key,0,0))
+ {
+ DBUG_PRINT("error", ("Failed to init HASH ndb_tables"));
+ DBUG_RETURN(-1);
+ }
+
+ if (hash_init(&ok_tables, system_charset_info,32,0,0,
+ (hash_get_key)tables_get_key,0,0))
+ {
+ DBUG_PRINT("error", ("Failed to init HASH ok_tables"));
+ hash_free(&ndb_tables);
+ DBUG_RETURN(-1);
+ }
+
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+
+ // Add only tables that belongs to db
+ if (my_strcasecmp(system_charset_info, t.database, db))
+ continue;
- DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
- if (create_table_from_handler(t.database, t.name, true))
- DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
+ // Apply wildcard to list of tables in NDB
+ if (wild)
+ {
+ if (lower_case_table_names)
+ {
+ if (wild_case_compare(files_charset_info, t.name, wild))
+ continue;
+ }
+ else if (wild_compare(t.name,wild,0))
+ continue;
+ }
+ DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));
+ my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
}
- DBUG_RETURN(0);
+
+ char *file_name;
+ List_iterator<char> it(*files);
+ List<char> delete_list;
+ while ((file_name=it++))
+ {
+ DBUG_PRINT("info", ("%s", file_name));
+ if (hash_search(&ndb_tables, file_name, strlen(file_name)))
+ {
+ DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name));
+ // File existed in NDB and as frm file, put in ok_tables list
+ my_hash_insert(&ok_tables, (byte*)file_name);
+ continue;
+ }
+
+ // File is not in NDB, check for .ndb file with this name
+ (void)strxnmov(name, FN_REFLEN,
+ mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
+ DBUG_PRINT("info", ("Check access for %s", name));
+ if (access(name, F_OK))
+ {
+ DBUG_PRINT("info", ("%s did not exist on disk", name));
+ // .ndb file did not exist on disk, another table type
+ continue;
+ }
+
+ DBUG_PRINT("info", ("%s existed on disk", name));
+ // The .ndb file exists on disk, but it's not in list of tables in ndb
+ // Verify that handler agrees table is gone.
+ if (ndbcluster_table_exists(thd, db, file_name) == 0)
+ {
+ DBUG_PRINT("info", ("NDB says %s does not exists", file_name));
+ it.remove();
+ // Put in list of tables to remove from disk
+ delete_list.push_back(thd->strdup(file_name));
+ }
+ }
+
+ // Check for new files to discover
+ DBUG_PRINT("info", ("Checking for new files to discover"));
+ List<char> create_list;
+ for (i= 0 ; i < ndb_tables.records ; i++)
+ {
+ file_name= hash_element(&ndb_tables, i);
+ if (!hash_search(&ok_tables, file_name, strlen(file_name)))
+ {
+ DBUG_PRINT("info", ("%s must be discovered", file_name));
+ // File is in list of ndb tables and not in ok_tables
+ // This table need to be created
+ create_list.push_back(thd->strdup(file_name));
+ }
+ }
+
+ // Lock mutex before deleting and creating frm files
+ pthread_mutex_lock(&LOCK_open);
+
+ if (!global_read_lock)
+ {
+ // Delete old files
+ List_iterator_fast<char> it3(delete_list);
+ while ((file_name=it3++))
+ {
+ DBUG_PRINT("info", ("Remove table %s/%s",db, file_name ));
+ // Delete the table and all related files
+ TABLE_LIST table_list;
+ bzero((char*) &table_list,sizeof(table_list));
+ table_list.db= (char*) db;
+ table_list.alias=table_list.real_name=(char*)file_name;
+ (void)mysql_rm_table_part2(thd, &table_list,
+ /* if_exists */ TRUE,
+ /* drop_temporary */ FALSE,
+ /* dont_log_query*/ TRUE);
+ }
+ }
+
+ // Create new files
+ List_iterator_fast<char> it2(create_list);
+ while ((file_name=it2++))
+ {
+ DBUG_PRINT("info", ("Table %s need discovery", name));
+ if (ha_create_table_from_engine(thd, db, file_name, TRUE) == 0)
+ files->push_back(thd->strdup(file_name));
+ }
+
+ pthread_mutex_unlock(&LOCK_open);
+
+ hash_free(&ok_tables);
+ hash_free(&ndb_tables);
+ } // extra bracket to avoid gcc 2.95.3 warning
+ DBUG_RETURN(0);
}
-#endif
/*
@@ -3410,34 +4044,55 @@ int ndb_discover_tables()
bool ndbcluster_init()
{
+ int res;
DBUG_ENTER("ndbcluster_init");
// Set connectstring if specified
if (ndbcluster_connectstring != 0)
- {
DBUG_PRINT("connectstring", ("%s", ndbcluster_connectstring));
- Ndb::setConnectString(ndbcluster_connectstring);
+ if ((g_ndb_cluster_connection=
+ new Ndb_cluster_connection(ndbcluster_connectstring)) == 0)
+ {
+ DBUG_PRINT("error",("Ndb_cluster_connection(%s)",ndbcluster_connectstring));
+ DBUG_RETURN(TRUE);
}
+
// Create a Ndb object to open the connection to NDB
- g_ndb= new Ndb("sys");
+ g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
+ g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_table_local_info));
if (g_ndb->init() != 0)
{
ERR_PRINT (g_ndb->getNdbError());
DBUG_RETURN(TRUE);
}
- if (g_ndb->waitUntilReady() != 0)
+
+ if ((res= g_ndb_cluster_connection->connect(1)) == 0)
{
- ERR_PRINT (g_ndb->getNdbError());
- DBUG_RETURN(TRUE);
+ g_ndb->waitUntilReady(10);
+ }
+ else if(res == 1)
+ {
+ if (g_ndb_cluster_connection->start_connect_thread()) {
+ DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
+ DBUG_RETURN(TRUE);
+ }
}
+ else
+ {
+ DBUG_ASSERT(res == -1);
+ DBUG_PRINT("error", ("permanent error"));
+ DBUG_RETURN(TRUE);
+ }
+
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP
if (ndb_discover_tables() != 0)
DBUG_RETURN(TRUE);
#endif
- DBUG_RETURN(false);
+ DBUG_RETURN(FALSE);
}
@@ -3450,15 +4105,15 @@ bool ndbcluster_init()
bool ndbcluster_end()
{
DBUG_ENTER("ndbcluster_end");
-
- delete g_ndb;
+ if(g_ndb)
+ delete g_ndb;
g_ndb= NULL;
+ if (g_ndb_cluster_connection)
+ delete g_ndb_cluster_connection;
+ g_ndb_cluster_connection= NULL;
if (!ndbcluster_inited)
DBUG_RETURN(0);
hash_free(&ndbcluster_open_tables);
-#ifdef USE_NDB_POOL
- ndb_pool_release();
-#endif
pthread_mutex_destroy(&ndbcluster_mutex);
ndbcluster_inited= 0;
DBUG_RETURN(0);
@@ -3584,8 +4239,6 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
NDB_INDEX_TYPE idx_type= get_index_type(inx);
DBUG_ENTER("records_in_range");
- DBUG_PRINT("enter", ("inx: %u", inx));
-
// Prevent partial read of hash indexes by returning HA_POS_ERROR
if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
((min_key && min_key->length < key_length) ||
@@ -3760,4 +4413,89 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,
DBUG_RETURN(0);
}
+
+static
+int
+ndb_get_table_statistics(Ndb* ndb, const char * table,
+ Uint64* row_count, Uint64* commit_count)
+{
+ DBUG_ENTER("ndb_get_table_statistics");
+ DBUG_PRINT("enter", ("table: %s", table));
+
+ do
+ {
+ NdbConnection* pTrans= ndb->startTransaction();
+ if (pTrans == NULL)
+ break;
+
+ NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
+ if (pOp == NULL)
+ break;
+
+ NdbResultSet* rs= pOp->readTuples(NdbOperation::LM_CommittedRead);
+ if (rs == 0)
+ break;
+
+ int check= pOp->interpret_exit_last_row();
+ if (check == -1)
+ break;
+
+ Uint64 rows, commits;
+ pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
+ pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
+
+ check= pTrans->execute(NoCommit);
+ if (check == -1)
+ break;
+
+ Uint64 sum_rows= 0;
+ Uint64 sum_commits= 0;
+ while((check= rs->nextResult(TRUE)) == 0)
+ {
+ sum_rows+= rows;
+ sum_commits+= commits;
+ }
+
+ if (check == -1)
+ break;
+
+ ndb->closeTransaction(pTrans);
+ if(row_count)
+ * row_count= sum_rows;
+ if(commit_count)
+ * commit_count= sum_commits;
+ DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
+ DBUG_RETURN(0);
+ } while(0);
+
+ DBUG_PRINT("exit", ("failed"));
+ DBUG_RETURN(-1);
+}
+
+/*
+ Create a .ndb file to serve as a placeholder indicating
+ that the table with this name is a ndb table
+*/
+
+int ha_ndbcluster::write_ndb_file()
+{
+ File file;
+ bool error=1;
+ char path[FN_REFLEN];
+
+ DBUG_ENTER("write_ndb_file");
+ DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
+
+ (void)strxnmov(path, FN_REFLEN,
+ mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
+
+ if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
+ {
+ // It's an empty file
+ error=0;
+ my_close(file,MYF(0));
+ }
+ DBUG_RETURN(error);
+}
+
#endif /* HAVE_NDBCLUSTER_DB */