summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc1027
1 files changed, 655 insertions, 372 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 21056ef4a8f..d499218a8c3 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -20,12 +20,6 @@
NDB Cluster
*/
-/*
- TODO
- After CREATE DATABASE gör discover på alla tabeller i den databasen
-
-*/
-
#ifdef __GNUC__
#pragma implementation // gcc: Class implementation
@@ -45,8 +39,13 @@
// Default value for parallelism
static const int parallelism= 240;
+// Default value for max number of transactions
+// createable against NDB from this handler
+static const int max_transactions = 256;
+
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
+
#define ERR_PRINT(err) \
DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
@@ -64,6 +63,8 @@ typedef NdbDictionary::Dictionary NDBDICT;
bool ndbcluster_inited= false;
+static Ndb* g_ndb= NULL;
+
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;
@@ -95,7 +96,22 @@ static const err_code_mapping err_map[]=
{ 630, HA_ERR_FOUND_DUPP_KEY },
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 721, HA_ERR_TABLE_EXIST },
+ { 4244, HA_ERR_TABLE_EXIST },
{ 241, HA_ERR_OLD_METADATA },
+
+ { 266, HA_ERR_LOCK_WAIT_TIMEOUT },
+ { 274, HA_ERR_LOCK_WAIT_TIMEOUT },
+ { 296, HA_ERR_LOCK_WAIT_TIMEOUT },
+ { 297, HA_ERR_LOCK_WAIT_TIMEOUT },
+ { 237, HA_ERR_LOCK_WAIT_TIMEOUT },
+
+ { 623, HA_ERR_RECORD_FILE_FULL },
+ { 624, HA_ERR_RECORD_FILE_FULL },
+ { 625, HA_ERR_RECORD_FILE_FULL },
+ { 826, HA_ERR_RECORD_FILE_FULL },
+ { 827, HA_ERR_RECORD_FILE_FULL },
+ { 832, HA_ERR_RECORD_FILE_FULL },
+
{ -1, -1 }
};
@@ -144,6 +160,28 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
/*
+ Override the default get_error_message in order to add the
+ error message of NDB
+ */
+
+bool ha_ndbcluster::get_error_message(int error,
+ String *buf)
+{
+ DBUG_ENTER("ha_ndbcluster::get_error_message");
+ DBUG_PRINT("enter", ("error: %d", error));
+
+ if (!m_ndb)
+ DBUG_RETURN(false);
+
+ const NdbError err= m_ndb->getNdbError(error);
+ bool temporary= err.status==NdbError::TemporaryError;
+ buf->set(err.message, strlen(err.message), &my_charset_bin);
+ DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary));
+ DBUG_RETURN(temporary);
+}
+
+
+/*
Instruct NDB to set the value of the hidden primary key
*/
@@ -297,7 +335,7 @@ int ha_ndbcluster::get_metadata(const char *path)
const NDBTAB *tab;
const void *data, *pack_data;
const char **key_name;
- uint ndb_columns, mysql_columns, length, pack_length, i;
+ uint ndb_columns, mysql_columns, length, pack_length;
int error;
DBUG_ENTER("get_metadata");
DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
@@ -353,60 +391,111 @@ int ha_ndbcluster::get_metadata(const char *path)
// All checks OK, lets use the table
m_table= (void*)tab;
- for (i= 0; i < MAX_KEY; i++)
- m_indextype[i]= UNDEFINED_INDEX;
+ DBUG_RETURN(build_index_list());
+}
+int ha_ndbcluster::build_index_list()
+{
+ char *name;
+ const char *index_name;
+ static const char* unique_suffix= "$unique";
+ uint i, name_len;
+ DBUG_ENTER("build_index_list");
+
// Save information about all known indexes
- for (i= 0; i < table->keys; i++)
- m_indextype[i] = get_index_type_from_table(i);
-
- DBUG_RETURN(0);
+ for (uint i= 0; i < table->keys; i++)
+ {
+ NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
+ m_indextype[i]= idx_type;
+
+ if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
+ {
+ index_name= get_index_name(i);
+ name_len= strlen(index_name)+strlen(unique_suffix)+1;
+ // Create name for unique index by appending "$unique";
+ if (!(name= my_malloc(name_len, MYF(MY_WME))))
+ DBUG_RETURN(2);
+ strxnmov(name, name_len, index_name, unique_suffix, NullS);
+ m_unique_index_name[i]= name;
+ DBUG_PRINT("info", ("Created unique index name: %s for index %d",
+ name, i));
+ }
+ }
+ DBUG_RETURN(0);
}
+
+
/*
Decode the type of an index from information
provided in table object
*/
-NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const
+NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
{
- if (index_no == table->primary_key)
- return PRIMARY_KEY_INDEX;
+ bool is_hash_index= (table->key_info[inx].algorithm == HA_KEY_ALG_HASH);
+ if (inx == table->primary_key)
+ return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
else
- return ((table->key_info[index_no].flags & HA_NOSAME) ?
- UNIQUE_INDEX :
+ return ((table->key_info[inx].flags & HA_NOSAME) ?
+ (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) :
ORDERED_INDEX);
}
-
+
void ha_ndbcluster::release_metadata()
{
+ uint i;
+
DBUG_ENTER("release_metadata");
DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
m_table= NULL;
+ // Release index list
+ for (i= 0; i < MAX_KEY; i++)
+ {
+ if (m_unique_index_name[i])
+ my_free((char*)m_unique_index_name[i], MYF(0));
+ m_unique_index_name[i]= NULL;
+ }
+
DBUG_VOID_RETURN;
}
+NdbCursorOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
+{
+ return (type == TL_WRITE_ALLOW_WRITE) ?
+ NdbCursorOperation::LM_Exclusive : NdbCursorOperation::LM_Read;
+}
+
static const ulong index_type_flags[]=
{
/* UNDEFINED_INDEX */
0,
/* PRIMARY_KEY_INDEX */
- HA_ONLY_WHOLE_INDEX |
- HA_WRONG_ASCII_ORDER |
- HA_NOT_READ_PREFIX_LAST,
+ HA_ONLY_WHOLE_INDEX,
+
+ /* PRIMARY_KEY_ORDERED_INDEX */
+ /*
+ Enable HA_KEYREAD_ONLY when "sorted" indexes are supported,
+ thus ORDERD BY clauses can be optimized by reading directly
+ through the index.
+ */
+ // HA_KEYREAD_ONLY |
+ HA_READ_NEXT |
+ HA_READ_RANGE,
/* UNIQUE_INDEX */
- HA_ONLY_WHOLE_INDEX |
- HA_WRONG_ASCII_ORDER |
- HA_NOT_READ_PREFIX_LAST,
+ HA_ONLY_WHOLE_INDEX,
+
+ /* UNIQUE_ORDERED_INDEX */
+ HA_READ_NEXT |
+ HA_READ_RANGE,
/* ORDERED_INDEX */
HA_READ_NEXT |
- HA_READ_PREV |
- HA_NOT_READ_AFTER_KEY
+ HA_READ_RANGE,
};
static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
@@ -416,6 +505,11 @@ inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
return table->keynames.type_names[idx_no];
}
+inline const char* ha_ndbcluster::get_unique_index_name(uint idx_no) const
+{
+ return m_unique_index_name[idx_no];
+}
+
inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
@@ -430,9 +524,10 @@ inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
flags depending on the type of the index.
*/
-inline ulong ha_ndbcluster::index_flags(uint idx_no) const
+inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part) const
{
DBUG_ENTER("index_flags");
+ DBUG_PRINT("info", ("idx_no: %d", idx_no));
DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
}
@@ -457,6 +552,24 @@ int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
}
+int ha_ndbcluster::set_primary_key_from_old_data(NdbOperation *op, const byte *old_data)
+{
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+ DBUG_ENTER("set_primary_key_from_old_data");
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, old_data+key_part->offset))
+ ERR_RETURN(op->getNdbError());
+ }
+ DBUG_RETURN(0);
+}
+
+
int ha_ndbcluster::set_primary_key(NdbOperation *op)
{
DBUG_ENTER("set_primary_key");
@@ -550,7 +663,6 @@ int ha_ndbcluster::unique_index_read(const byte *key,
uint key_len, byte *buf)
{
NdbConnection *trans= m_active_trans;
- const char *index_name;
NdbIndexOperation *op;
THD *thd= current_thd;
byte *key_ptr;
@@ -560,9 +672,10 @@ int ha_ndbcluster::unique_index_read(const byte *key,
DBUG_ENTER("unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len);
+ DBUG_PRINT("enter", ("name: %s", get_unique_index_name(active_index)));
- index_name= get_index_name(active_index);
- if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) ||
+ if (!(op= trans->getNdbIndexOperation(get_unique_index_name(active_index),
+ m_tabname)) ||
op->readTuple() != 0)
ERR_RETURN(trans->getNdbError());
@@ -608,25 +721,64 @@ int ha_ndbcluster::unique_index_read(const byte *key,
}
/*
- Get the next record of a started scan
+ Get the next record of a started scan. Try to fetch
+ it locally from NdbApi cached records if possible,
+ otherwise ask NDB for more.
+
+ NOTE
+ If this is a update/delete make sure to not contact
+ NDB before any pending ops have been sent to NDB.
+
*/
inline int ha_ndbcluster::next_result(byte *buf)
{
+ int check;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor= m_active_cursor;
DBUG_ENTER("next_result");
-
- if (cursor->nextResult() == 0)
- {
- // One more record found
- unpack_record(buf);
- table->status= 0;
- DBUG_RETURN(0);
- }
+
+ if (!cursor)
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+ /*
+ If this an update or delete, call nextResult with false
+ to process any records already cached in NdbApi
+ */
+ bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
+ do {
+ DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
+ check= cursor->nextResult(contact_ndb);
+ if (check == 0)
+ {
+ // One more record found
+ DBUG_PRINT("info", ("One more record found"));
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+ }
+ else if (check == 1 || check == 2)
+ {
+ // 1: No more records
+ // 2: No more cached records
+
+ /*
+ Before fetching more rows and releasing lock(s),
+ all pending update or delete operations should
+ be sent to NDB
+ */
+ DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
+ if (ops_pending && (trans->execute(NoCommit) != 0))
+ DBUG_RETURN(ndb_err(trans));
+ ops_pending= 0;
+
+ contact_ndb= (check == 2);
+ }
+ } while (check == 2);
+
table->status= STATUS_NOT_FOUND;
- if (ndb_err(trans))
- ERR_RETURN(trans->getNdbError());
+ if (check == -1)
+ DBUG_RETURN(ndb_err(trans));
// No more records
DBUG_PRINT("info", ("No more records"));
@@ -635,155 +787,135 @@ inline int ha_ndbcluster::next_result(byte *buf)
/*
- Read record(s) from NDB using ordered index scan
+ Set bounds for a ordered index scan, use key_range
*/
-int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len,
- byte *buf,
- enum ha_rkey_function find_flag)
-{
- uint no_fields= table->fields;
- uint tot_len, i;
- NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor= m_active_cursor;
- NdbScanOperation *op;
- const char *bound_str= NULL;
- const char *index_name;
- NdbOperation::BoundType bound_type = NdbOperation::BoundEQ;
- bool can_be_handled_by_ndb= FALSE;
+int ha_ndbcluster::set_bounds(NdbOperation *op,
+ const key_range *key,
+ int bound)
+{
+ uint i, tot_len;
byte *key_ptr;
- KEY *key_info;
- THD* thd = current_thd;
- DBUG_ENTER("ordered_index_scan");
- DBUG_PRINT("enter", ("index: %u", active_index));
- DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
-
- index_name= get_index_name(active_index);
- if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
- ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism)))
- ERR_RETURN(trans->getNdbError());
- m_active_cursor= cursor;
-
- switch (find_flag) {
- case HA_READ_KEY_EXACT: /* Find first record else error */
- bound_str= "HA_READ_KEY_EXACT";
- bound_type= NdbOperation::BoundEQ;
- can_be_handled_by_ndb= TRUE;
- break;
- case HA_READ_KEY_OR_NEXT: /* Record or next record */
- bound_str= "HA_READ_KEY_OR_NEXT";
- bound_type= NdbOperation::BoundLE;
- can_be_handled_by_ndb= TRUE;
- break;
- case HA_READ_KEY_OR_PREV: /* Record or previous */
- bound_str= "HA_READ_KEY_OR_PREV";
- bound_type= NdbOperation::BoundGE;
- can_be_handled_by_ndb= TRUE;
- break;
- case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
- bound_str= "HA_READ_AFTER_KEY";
- bound_type= NdbOperation::BoundLT;
- can_be_handled_by_ndb= TRUE;
- break;
- case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
- bound_str= "HA_READ_BEFORE_KEY";
- bound_type= NdbOperation::BoundGT;
- can_be_handled_by_ndb= TRUE;
- break;
- case HA_READ_PREFIX: /* Key which as same prefix */
- bound_str= "HA_READ_PREFIX";
- break;
- case HA_READ_PREFIX_LAST: /* Last key with the same prefix */
- bound_str= "HA_READ_PREFIX_LAST";
- break;
- case HA_READ_PREFIX_LAST_OR_PREV:
- /* Last or prev key with the same prefix */
- bound_str= "HA_READ_PREFIX_LAST_OR_PREV";
- break;
- default:
- bound_str= "UNKNOWN";
- break;
- }
- DBUG_PRINT("info", ("find_flag: %s, bound_type: %d,"
- "can_be_handled_by_ndb: %d",
- bound_str, bound_type, can_be_handled_by_ndb));
- if (!can_be_handled_by_ndb)
- DBUG_RETURN(1);
-
+ KEY* key_info= table->key_info + active_index;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+
+ DBUG_ENTER("set_bounds");
+ DBUG_PRINT("enter", ("bound: %d", bound));
+ DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts));
+ DBUG_PRINT("enter", ("key->length: %d", key->length));
+ DBUG_PRINT("enter", ("key->flag: %d", key->flag));
+
// Set bounds using key data
tot_len= 0;
- key_ptr= (byte *) key;
- key_info= table->key_info + active_index;
- for (i= 0; i < key_info->key_parts; i++)
+ key_ptr= (byte *) key->key;
+ for (; key_part != end; key_part++)
{
- Field* field= key_info->key_part[i].field;
+ Field* field= key_part->field;
uint32 field_len= field->pack_length();
- DBUG_PRINT("info", ("Set index bound on %s",
+ tot_len+= field_len;
+
+ const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"};
+ DBUG_ASSERT(bound >= 0 && bound <= 4);
+ DBUG_PRINT("info", ("Set Bound%s on %s",
+ bounds[bound],
field->field_name));
DBUG_DUMP("key", (char*)key_ptr, field_len);
-
+
if (op->setBound(field->field_name,
- bound_type,
+ bound,
key_ptr,
field_len) != 0)
ERR_RETURN(op->getNdbError());
key_ptr+= field_len;
- tot_len+= field_len;
- if (tot_len >= key_len)
+
+ if (tot_len >= key->length)
+ break;
+
+ /*
+ Only one bound which is not EQ can be set
+ so if this bound was not EQ, bail out and make
+ a best effort attempt
+ */
+ if (bound != NdbOperation::BoundEQ)
break;
}
-
- // Define attributes to read
- for (i= 0; i < no_fields; i++)
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Start ordered index scan in NDB
+*/
+
+int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
+ const key_range *end_key,
+ bool sorted, byte* buf)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor;
+ NdbScanOperation *op;
+ const char *index_name;
+
+ DBUG_ENTER("ordered_index_scan");
+ DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
+ DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ if (start_key &&
+ set_bounds(op, start_key,
+ (start_key->flag == HA_READ_KEY_EXACT) ?
+ NdbOperation::BoundEQ :
+ (start_key->flag == HA_READ_AFTER_KEY) ?
+ NdbOperation::BoundLT :
+ NdbOperation::BoundLE))
+ DBUG_RETURN(1);
+
+ if (end_key)
{
- Field *field= table->field[i];
- if ((thd->query_id == field->query_id) ||
- (field->flags & PRI_KEY_FLAG))
- {
- if (get_ndb_value(op, i, field->ptr))
- ERR_RETURN(op->getNdbError());
- }
- else
+ if (start_key && start_key->flag == HA_READ_KEY_EXACT)
{
- m_value[i]= NULL;
+ DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key"));
}
+ else if (set_bounds(op, end_key,
+ (end_key->flag == HA_READ_AFTER_KEY) ?
+ NdbOperation::BoundGE :
+ NdbOperation::BoundGT))
+ DBUG_RETURN(1);
}
-
- if (table->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Getting hidden key"));
- // Scanning table with no primary key
- int hidden_no= no_fields;
-#ifndef DBUG_OFF
- const NDBTAB *tab= (NDBTAB *) m_table;
- if (!tab->getColumn(hidden_no))
- DBUG_RETURN(1);
-#endif
- if (get_ndb_value(op, hidden_no, NULL))
- ERR_RETURN(op->getNdbError());
- }
-
- if (trans->execute(NoCommit) != 0)
- DBUG_RETURN(ndb_err(trans));
- DBUG_PRINT("exit", ("Scan started successfully"));
- DBUG_RETURN(next_result(buf));
+ DBUG_RETURN(define_read_attrs(buf, op));
}
-#if 0
/*
- Read record(s) from NDB using full table scan with filter
+ Start a filtered scan in NDB.
+
+ NOTE
+ This function is here as an example of how to start a
+ filtered scan. It should be possible to replace full_table_scan
+ with this function and make a best effort attempt
+ at filtering out the irrelevant data by converting the "items"
+ into interpreted instructions.
+ This would speed up table scans where there is a limiting WHERE clause
+ that doesn't match any index in the table.
+
*/
int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
byte *buf,
enum ha_rkey_function find_flag)
{
- uint no_fields= table->fields;
NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor= m_active_cursor;
+ NdbResultSet *cursor;
+ NdbScanOperation *op;
DBUG_ENTER("filtered_scan");
DBUG_PRINT("enter", ("key_len: %u, index: %u",
@@ -791,12 +923,10 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
DBUG_DUMP("key", (char*)key, key_len);
DBUG_PRINT("info", ("Starting a new filtered scan on %s",
m_tabname));
- NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
- if (!op)
+
+ if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
-
- cursor= op->readTuples(parallelism);
- if (!cursor)
+ if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
@@ -845,66 +975,49 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
sf.end();
}
- // Define attributes to read
- for (uint field_no= 0; field_no < no_fields; field_no++)
- {
- Field *field= table->field[field_no];
-
- // Read attribute
- DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
- if (get_ndb_value(op, field_no, field->ptr))
- ERR_RETURN(op->getNdbError());
- }
-
- if (table->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Getting hidden key"));
- // Scanning table with no primary key
- int hidden_no= no_fields;
-#ifndef DBUG_OFF
- const NDBTAB *tab= (NDBTAB *) m_table;
- if (!tab->getColumn(hidden_no))
- DBUG_RETURN(1);
-#endif
- if (get_ndb_value(op, hidden_no, NULL))
- ERR_RETURN(op->getNdbError());
- }
-
- if (trans->execute(NoCommit) != 0)
- DBUG_RETURN(ndb_err(trans));
- DBUG_PRINT("exit", ("Scan started successfully"));
- DBUG_RETURN(next_result(buf));
+ DBUG_RETURN(define_read_attrs(buf, op));
}
-#endif
/*
- Read records from NDB using full table scan
+ Start full table scan in NDB
*/
int ha_ndbcluster::full_table_scan(byte *buf)
{
uint i;
- THD *thd= current_thd;
- NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbScanOperation *op;
+ NdbConnection *trans= m_active_trans;
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
- if (!(cursor= op->readTuples(parallelism)))
+ if (!(cursor= op->readTuples(parallelism, get_ndb_lock_type(m_lock.type))))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
-
+ DBUG_RETURN(define_read_attrs(buf, op));
+}
+
+
+inline
+int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
+{
+ uint i;
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+
+ DBUG_ENTER("define_read_attrs");
+
// Define attributes to read
for (i= 0; i < table->fields; i++)
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
- (field->flags & PRI_KEY_FLAG))
+ (field->flags & PRI_KEY_FLAG) ||
+ retrieve_all_fields)
{
if (get_ndb_value(op, i, field->ptr))
ERR_RETURN(op->getNdbError());
@@ -991,8 +1104,17 @@ int ha_ndbcluster::write_row(byte *record)
to NoCommit the transaction between each row.
Find out how this is detected!
*/
- if (trans->execute(NoCommit) != 0)
- DBUG_RETURN(ndb_err(trans));
+ rows_inserted++;
+ if ((rows_inserted == rows_to_insert) ||
+ ((rows_inserted % bulk_insert_rows) == 0))
+ {
+ // Send rows to NDB
+ DBUG_PRINT("info", ("Sending inserts to NDB, "\
+ "rows_inserted:%d, bulk_insert_rows: %d",
+ rows_inserted, bulk_insert_rows));
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ }
DBUG_RETURN(0);
}
@@ -1039,6 +1161,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
THD *thd= current_thd;
NdbConnection *trans= m_active_trans;
+ NdbResultSet* cursor= m_active_cursor;
NdbOperation *op;
uint i;
DBUG_ENTER("update_row");
@@ -1047,49 +1170,66 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (table->timestamp_on_update_now)
update_timestamp(new_data+table->timestamp_on_update_now-1);
- if (!(op= trans->getNdbOperation(m_tabname)) ||
- op->updateTuple() != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (table->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
-
- // Require that the PK for this record has previously been
- // read into m_value
- uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
- DBUG_ASSERT(rec);
- DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
-
- if (set_hidden_key(op, no_fields, rec->aRef()))
- ERR_RETURN(op->getNdbError());
- }
- else
+ /* Check for update of primary key and return error */
+ if ((table->primary_key != MAX_KEY) &&
+ (key_cmp(table->primary_key, old_data, new_data)))
+ DBUG_RETURN(HA_ERR_UNSUPPORTED);
+
+ if (cursor)
{
- /* Check for update of primary key and return error */
- if (key_cmp(table->primary_key, old_data, new_data))
- DBUG_RETURN(HA_ERR_UNSUPPORTED);
-
- int res;
- if ((res= set_primary_key(op, old_data + table->null_bytes)))
- DBUG_RETURN(res);
+ /*
+ We are scanning records and want to update the record
+ that was just found, call updateTuple on the cursor
+ to take over the lock to a new update operation
+ And thus setting the primary key of the record from
+ the active record in cursor
+ */
+ DBUG_PRINT("info", ("Calling updateTuple on cursor"));
+ if (!(op= cursor->updateTuple()))
+ ERR_RETURN(trans->getNdbError());
+ ops_pending++;
+ }
+ else
+ {
+ if (!(op= trans->getNdbOperation(m_tabname)) ||
+ op->updateTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+
+ // Require that the PK for this record has previously been
+ // read into m_value
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec);
+ DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key_from_old_data(op, old_data)))
+ DBUG_RETURN(res);
+ }
}
// Set non-key attribute(s)
for (i= 0; i < table->fields; i++)
{
-
Field *field= table->field[i];
if ((thd->query_id == field->query_id) &&
(!(field->flags & PRI_KEY_FLAG)) &&
set_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
-
+
// Execute update operation
- if (trans->execute(NoCommit) != 0)
+ if (!cursor && trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(0);
@@ -1103,39 +1243,61 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
int ha_ndbcluster::delete_row(const byte *record)
{
NdbConnection *trans= m_active_trans;
+ NdbResultSet* cursor= m_active_cursor;
NdbOperation *op;
DBUG_ENTER("delete_row");
statistic_increment(ha_delete_count,&LOCK_status);
- if (!(op=trans->getNdbOperation(m_tabname)) ||
- op->deleteTuple() != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (table->primary_key == MAX_KEY)
+ if (cursor)
{
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
- uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields];
- DBUG_ASSERT(rec != NULL);
+ /*
+ We are scanning records and want to update the record
+ that was just found, call deleteTuple on the cursor
+ to take over the lock to a new update operation
+ And thus setting the primary key of the record from
+ the active record in cursor
+ */
+ DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
+ if (cursor->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+ ops_pending++;
- if (set_hidden_key(op, no_fields, rec->aRef()))
- ERR_RETURN(op->getNdbError());
- }
- else
+ // If deleting from cursor, NoCommit will be handled in next_result
+ DBUG_RETURN(0);
+ }
+ else
{
- int res;
- if ((res= set_primary_key(op)))
- return res;
+
+ if (!(op=trans->getNdbOperation(m_tabname)) ||
+ op->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec != NULL);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
}
-
+
// Execute delete operation
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_RETURN(0);
}
-
+
/*
Unpack a record read from NDB
@@ -1223,6 +1385,7 @@ void ha_ndbcluster::print_results()
switch (col->getType()) {
case NdbDictionary::Column::Blob:
+ case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
@@ -1337,7 +1500,7 @@ int ha_ndbcluster::index_init(uint index)
int ha_ndbcluster::index_end()
{
DBUG_ENTER("index_end");
- DBUG_RETURN(rnd_end());
+ DBUG_RETURN(close_scan());
}
@@ -1348,28 +1511,12 @@ int ha_ndbcluster::index_read(byte *buf,
DBUG_ENTER("index_read");
DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
active_index, key_len, find_flag));
-
- int error= 1;
- statistic_increment(ha_read_key_count, &LOCK_status);
- switch (get_index_type(active_index)){
- case PRIMARY_KEY_INDEX:
- error= pk_read(key, key_len, buf);
- break;
-
- case UNIQUE_INDEX:
- error= unique_index_read(key, key_len, buf);
- break;
-
- case ORDERED_INDEX:
- error= ordered_index_scan(key, key_len, buf, find_flag);
- break;
-
- default:
- case UNDEFINED_INDEX:
- break;
- }
- DBUG_RETURN(error);
+ key_range start_key;
+ start_key.key= key;
+ start_key.length= key_len;
+ start_key.flag= find_flag;
+ DBUG_RETURN(read_range_first(&start_key, NULL, false, true));
}
@@ -1391,11 +1538,7 @@ int ha_ndbcluster::index_next(byte *buf)
int error = 1;
statistic_increment(ha_read_next_count,&LOCK_status);
- if (get_index_type(active_index) == PRIMARY_KEY_INDEX)
- error= HA_ERR_END_OF_FILE;
- else
- error = next_result(buf);
- DBUG_RETURN(error);
+ DBUG_RETURN(next_result(buf));
}
@@ -1423,6 +1566,62 @@ int ha_ndbcluster::index_last(byte *buf)
}
+int ha_ndbcluster::read_range_first(const key_range *start_key,
+ const key_range *end_key,
+ bool eq_range, bool sorted)
+{
+ KEY* key_info;
+ int error= 1;
+ byte* buf= table->record[0];
+ DBUG_ENTER("ha_ndbcluster::read_range_first");
+ DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
+
+ if (m_active_cursor)
+ close_scan();
+
+ switch (get_index_type(active_index)){
+ case PRIMARY_KEY_ORDERED_INDEX:
+ case PRIMARY_KEY_INDEX:
+ key_info= table->key_info + active_index;
+ if (start_key &&
+ start_key->length == key_info->key_length &&
+ start_key->flag == HA_READ_KEY_EXACT)
+ {
+ error= pk_read(start_key->key, start_key->length, buf);
+ DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
+ }
+ break;
+ case UNIQUE_ORDERED_INDEX:
+ case UNIQUE_INDEX:
+ key_info= table->key_info + active_index;
+ if (start_key &&
+ start_key->length == key_info->key_length &&
+ start_key->flag == HA_READ_KEY_EXACT)
+ {
+ error= unique_index_read(start_key->key, start_key->length, buf);
+ DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
+ }
+ break;
+ default:
+ break;
+ }
+
+
+ // Start the ordered index scan and fetch the first row
+ error= ordered_index_scan(start_key, end_key, sorted,
+ buf);
+
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::read_range_next()
+{
+ DBUG_ENTER("ha_ndbcluster::read_range_next");
+ DBUG_RETURN(next_result(table->record[0]));
+}
+
+
int ha_ndbcluster::rnd_init(bool scan)
{
NdbResultSet *cursor= m_active_cursor;
@@ -1435,19 +1634,23 @@ int ha_ndbcluster::rnd_init(bool scan)
DBUG_RETURN(0);
}
+int ha_ndbcluster::close_scan()
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("close_scan");
+
+ if (!cursor)
+ DBUG_RETURN(1);
+
+ cursor->close();
+ m_active_cursor= NULL;
+ DBUG_RETURN(0);
+}
int ha_ndbcluster::rnd_end()
{
- NdbResultSet *cursor= m_active_cursor;
DBUG_ENTER("rnd_end");
-
- if (cursor)
- {
- DBUG_PRINT("info", ("Closing the cursor"));
- cursor->close();
- m_active_cursor= NULL;
- }
- DBUG_RETURN(0);
+ DBUG_RETURN(close_scan());
}
@@ -1455,12 +1658,10 @@ int ha_ndbcluster::rnd_next(byte *buf)
{
DBUG_ENTER("rnd_next");
statistic_increment(ha_read_rnd_next_count, &LOCK_status);
- int error = 1;
+
if (!m_active_cursor)
- error = full_table_scan(buf);
- else
- error = next_result(buf);
- DBUG_RETURN(error);
+ DBUG_RETURN(full_table_scan(buf));
+ DBUG_RETURN(next_result(buf));
}
@@ -1654,6 +1855,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
where field->query_id is the same as
the current query id */
DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
+ retrieve_all_fields= TRUE;
break;
case HA_EXTRA_PREPARE_FOR_DELETE:
DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
@@ -1679,6 +1881,53 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
DBUG_RETURN(0);
}
+/*
+ Start of an insert, remember number of rows to be inserted, it will
+ be used in write_row and get_autoincrement to send an optimal number
+ of rows in each roundtrip to the server
+
+ SYNOPSIS
+ rows number of rows to insert, 0 if unknown
+
+*/
+
+void ha_ndbcluster::start_bulk_insert(ha_rows rows)
+{
+ int bytes, batch;
+ const NDBTAB *tab= (NDBTAB *) m_table;
+
+ DBUG_ENTER("start_bulk_insert");
+ DBUG_PRINT("enter", ("rows: %d", rows));
+
+ rows_inserted= 0;
+ rows_to_insert= rows;
+
+ /*
+ Calculate how many rows that should be inserted
+ per roundtrip to NDB. This is done in order to minimize the
+ number of roundtrips as much as possible. However performance will
+ degrade if too many bytes are inserted, thus it's limited by this
+ calculation.
+ */
+ const int bytesperbatch = 8192;
+ bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
+ batch= bytesperbatch/bytes;
+ batch= batch == 0 ? 1 : batch;
+ DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
+ bulk_insert_rows= batch;
+
+ DBUG_VOID_RETURN;
+}
+
+/*
+ End of an insert
+ */
+int ha_ndbcluster::end_bulk_insert()
+{
+ DBUG_ENTER("end_bulk_insert");
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{
@@ -1708,7 +1957,7 @@ const char **ha_ndbcluster::bas_ext() const
double ha_ndbcluster::scan_time()
{
- return rows2double(records/3);
+ return rows2double(records*1000);
}
@@ -1740,6 +1989,8 @@ THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
m_lock.type=lock_type;
}
*to++= &m_lock;
+
+ DBUG_PRINT("exit", ("lock_type: %d", lock_type));
DBUG_RETURN(to);
}
@@ -1788,6 +2039,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (lock_type != F_UNLCK)
{
+ DBUG_PRINT("info", ("lock_type != F_UNLCK"));
if (!thd->transaction.ndb_lock_count++)
{
PRINT_OPTION_FLAGS(thd);
@@ -1853,10 +2105,14 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
(NdbConnection*)thd->transaction.all.ndb_tid:
(NdbConnection*)thd->transaction.stmt.ndb_tid;
DBUG_ASSERT(m_active_trans);
-
+
+ // Start of transaction
+ retrieve_all_fields= FALSE;
+ ops_pending= 0;
}
else
{
+ DBUG_PRINT("info", ("lock_type == F_UNLCK"));
if (!--thd->transaction.ndb_lock_count)
{
DBUG_PRINT("trans", ("Last external_lock"));
@@ -1904,6 +2160,10 @@ int ha_ndbcluster::start_stmt(THD *thd)
thd->transaction.stmt.ndb_tid= trans;
}
m_active_trans= trans;
+
+ // Start of statement
+ retrieve_all_fields= FALSE;
+ ops_pending= 0;
DBUG_RETURN(error);
}
@@ -2041,9 +2301,8 @@ int ha_ndbcluster::create(const char *name,
NdbDictionary::Column::Type ndb_type;
NDBCOL col;
uint pack_length, length, i;
- int res;
const void *data, *pack_data;
- const char **key_name= form->keynames.type_names;
+ const char **key_names= form->keynames.type_names;
char name2[FN_HEADLEN];
DBUG_ENTER("create");
@@ -2086,13 +2345,11 @@ int ha_ndbcluster::create(const char *name,
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
if (field->flags & AUTO_INCREMENT_FLAG)
{
- DBUG_PRINT("info", ("Found auto_increment key"));
col.setAutoIncrement(TRUE);
- ulonglong value = info->auto_increment_value ?
- info->auto_increment_value -1 :
- (ulonglong) 0;
- DBUG_PRINT("info", ("initial value=%ld", value));
-// col.setInitialAutIncValue(value);
+ ulonglong value= info->auto_increment_value ?
+ info->auto_increment_value -1 : (ulonglong) 0;
+ DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
+ col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
@@ -2132,65 +2389,86 @@ int ha_ndbcluster::create(const char *name,
DBUG_PRINT("info", ("Table %s/%s created successfully",
m_dbname, m_tabname));
- // Fetch table from NDB, check that it exists
- const NDBTAB *tab2= dict->getTable(m_tabname);
- if (tab2 == NULL)
- {
- const NdbError err= dict->getNdbError();
- ERR_PRINT(err);
- my_errno= ndb_to_mysql_error(&err);
+ if ((my_errno= build_index_list()))
DBUG_RETURN(my_errno);
- }
// Create secondary indexes
- for (i= 0; i < form->keys; i++)
+ KEY* key_info= form->key_info;
+ const char** key_name= key_names;
+ for (i= 0; i < form->keys; i++, key_info++, key_name++)
{
- DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i]));
- if (i == form->primary_key)
- {
- DBUG_PRINT("info", ("Skipping it, PK already created"));
- continue;
- }
-
- DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i]));
- res= create_index(key_name[i],
- form->key_info + i);
- switch(res){
- case 0:
- // OK
+ int error= 0;
+ DBUG_PRINT("info", ("Index %u: %s", i, *key_name));
+
+ switch (get_index_type_from_table(i)){
+
+ case PRIMARY_KEY_INDEX:
+ // Do nothing, already created
+ break;
+ case PRIMARY_KEY_ORDERED_INDEX:
+ error= create_ordered_index(*key_name, key_info);
+ break;
+ case UNIQUE_ORDERED_INDEX:
+ if (!(error= create_ordered_index(*key_name, key_info)))
+ error= create_unique_index(get_unique_index_name(i), key_info);
+ break;
+ case UNIQUE_INDEX:
+ error= create_unique_index(get_unique_index_name(i), key_info);
+ break;
+ case ORDERED_INDEX:
+ error= create_ordered_index(*key_name, key_info);
break;
default:
+ DBUG_ASSERT(false);
+ break;
+ }
+
+ if (error)
+ {
DBUG_PRINT("error", ("Failed to create index %u", i));
drop_table();
- my_errno= res;
- goto err_end;
+ my_errno= error;
+ break;
}
}
-err_end:
DBUG_RETURN(my_errno);
}
+int ha_ndbcluster::create_ordered_index(const char *name,
+ KEY *key_info)
+{
+ DBUG_ENTER("create_ordered_index");
+ DBUG_RETURN(create_index(name, key_info, false));
+}
+
+int ha_ndbcluster::create_unique_index(const char *name,
+ KEY *key_info)
+{
+
+ DBUG_ENTER("create_unique_index");
+ DBUG_RETURN(create_index(name, key_info, true));
+}
+
+
/*
Create an index in NDB Cluster
*/
int ha_ndbcluster::create_index(const char *name,
- KEY *key_info){
+ KEY *key_info,
+ bool unique)
+{
NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
KEY_PART_INFO *key_part= key_info->key_part;
KEY_PART_INFO *end= key_part + key_info->key_parts;
DBUG_ENTER("create_index");
DBUG_PRINT("enter", ("name: %s ", name));
-
- // Check that an index with the same name do not already exist
- if (dict->getIndex(name, m_tabname))
- ERR_RETURN(dict->getNdbError());
-
+
NdbDictionary::Index ndb_index(name);
- if (key_info->flags & HA_NOSAME)
+ if (unique)
ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
else
{
@@ -2321,10 +2599,10 @@ int ndbcluster_drop_database(const char *path)
longlong ha_ndbcluster::get_auto_increment()
-{
- // NOTE If number of values to be inserted is known
- // the autoincrement cache could be used here
- Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+{
+ int cache_size = rows_to_insert ? rows_to_insert : 32;
+ Uint64 auto_value=
+ m_ndb->getAutoIncrementValue(m_tabname, cache_size);
return (longlong)auto_value;
}
@@ -2340,16 +2618,18 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_ndb(NULL),
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
- HA_KEYPOS_TO_RNDPOS |
HA_NOT_EXACT_COUNT |
- HA_NO_WRITE_DELAYED |
HA_NO_PREFIX_CHAR_KEYS |
- HA_NO_BLOBS |
- HA_DROP_BEFORE_CREATE |
- HA_NOT_READ_AFTER_KEY),
- m_use_write(false)
+ HA_NO_BLOBS),
+ m_use_write(false),
+ retrieve_all_fields(FALSE),
+ rows_to_insert(0),
+ rows_inserted(0),
+ bulk_insert_rows(1024),
+ ops_pending(0)
{
-
+ int i;
+
DBUG_ENTER("ha_ndbcluster");
m_tabname[0]= '\0';
@@ -2360,6 +2640,12 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
records= 100;
block_size= 1024;
+ for (i= 0; i < MAX_KEY; i++)
+ {
+ m_indextype[i]= UNDEFINED_INDEX;
+ m_unique_index_name[i]= NULL;
+ }
+
DBUG_VOID_RETURN;
}
@@ -2445,7 +2731,7 @@ Ndb* ha_ndbcluster::seize_ndb()
#else
ndb= new Ndb("");
#endif
- if (ndb->init(NDB_MAX_TRANSACTIONS) != 0)
+ if (ndb->init(max_transactions) != 0)
{
ERR_PRINT(ndb->getNdbError());
/*
@@ -2500,11 +2786,6 @@ int ha_ndbcluster::check_ndb_connection()
}
m_ndb= (Ndb*)thd->transaction.ndb;
m_ndb->setDatabaseName(m_dbname);
- if (m_ndb->waitUntilReady() != 0)
- {
- DBUG_PRINT("error", ("Ndb was not ready"));
- DBUG_RETURN(3);
- }
DBUG_RETURN(0);
}
@@ -2561,7 +2842,6 @@ int ndbcluster_discover(const char *dbname, const char *name,
DBUG_RETURN(0);
}
-static Ndb* g_ndb= NULL;
#ifdef USE_DISCOVER_ON_STARTUP
/*
@@ -2653,6 +2933,14 @@ bool ndbcluster_end()
DBUG_RETURN(0);
}
+void ndbcluster_print_error(int error)
+{
+ DBUG_ENTER("ndbcluster_print_error");
+ TABLE tab;
+ tab.table_name = NULL;
+ ha_ndbcluster error_handler(&tab);
+ error_handler.print_error(error, MYF(0));
+}
/*
Set m_tabname from full pathname to table file
@@ -2751,32 +3039,27 @@ ha_rows
ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
key_range *max_key)
{
- ha_rows records= 10; /* Good guess when you don't know anything */
KEY *key_info= table->key_info + inx;
uint key_length= key_info->key_length;
+ NDB_INDEX_TYPE idx_type= get_index_type(inx);
DBUG_ENTER("records_in_range");
DBUG_PRINT("enter", ("inx: %u", inx));
- DBUG_DUMP("start_key", min_key->key, min_key->length);
- DBUG_DUMP("end_key", max_key->key, max_key->length);
- DBUG_PRINT("enter", ("start_search_flag: %u end_search_flag: %u",
- min_key->flag, max_key->flag));
- /*
- Check that start_key_len is equal to
- the length of the used index and
- prevent partial scan/read of hash indexes by returning HA_POS_ERROR
- */
- NDB_INDEX_TYPE idx_type= get_index_type(inx);
- if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
- min_key->length < key_length)
- {
- DBUG_PRINT("warning", ("Tried to use index which required"
- "full key length: %d, HA_POS_ERROR",
- key_length));
- records= HA_POS_ERROR;
- }
- DBUG_RETURN(records);
+ // Prevent partial read of hash indexes by returning HA_POS_ERROR
+ if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
+ ((min_key && min_key->length < key_length) ||
+ (max_key && max_key->length < key_length)))
+ DBUG_RETURN(HA_POS_ERROR);
+
+ // Read from hash index with full key
+ // This is a "const" table which returns only one record!
+ if ((idx_type != ORDERED_INDEX) &&
+ ((min_key && min_key->length == key_length) ||
+ (max_key && max_key->length == key_length)))
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(10); /* Good guess when you don't know anything */
}