summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc209
1 files changed, 112 insertions, 97 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index b84350fd3b8..119171d1229 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -84,9 +84,10 @@ handlerton ndbcluster_hton = {
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
};
-static handler *ndbcluster_create_handler(TABLE_SHARE *table)
+static handler *ndbcluster_create_handler(TABLE_SHARE *table,
+ MEM_ROOT *mem_root)
{
- return new ha_ndbcluster(table);
+ return new (mem_root) ha_ndbcluster(table);
}
static uint ndbcluster_partition_flags()
@@ -437,9 +438,10 @@ void ha_ndbcluster::records_update()
Ndb *ndb= get_ndb();
ndb->setDatabaseName(m_dbname);
struct Ndb_statistics stat;
- if (ndb_get_table_statistics(ndb, m_table, &stat) == 0){
- mean_rec_length= stat.row_size;
- data_file_length= stat.fragment_memory;
+ if (ndb_get_table_statistics(ndb, m_table, &stat) == 0)
+ {
+ stats.mean_rec_length= stat.row_size;
+ stats.data_file_length= stat.fragment_memory;
info->records= stat.row_count;
}
}
@@ -448,7 +450,7 @@ void ha_ndbcluster::records_update()
if (get_thd_ndb(thd)->error)
info->no_uncommitted_rows_count= 0;
}
- records= info->records+ info->no_uncommitted_rows_count;
+ stats.records= info->records+ info->no_uncommitted_rows_count;
DBUG_VOID_RETURN;
}
@@ -896,23 +898,24 @@ int ha_ndbcluster::get_ndb_partition_id(NdbOperation *ndb_op)
/*
Check if any set or get of blob value in current query.
*/
+
bool ha_ndbcluster::uses_blob_value()
{
+ uint blob_fields;
+ MY_BITMAP *bitmap;
+ uint *blob_index, *blob_index_end;
if (table_share->blob_fields == 0)
return FALSE;
+
+ bitmap= m_write_op ? table->write_set : table->read_set;
+ blob_index= table_share->blob_field;
+ blob_index_end= blob_index + table_share->blob_fields;
+ do
{
- uint no_fields= table_share->fields;
- int i;
- // They always put blobs at the end..
- for (i= no_fields - 1; i >= 0; i--)
- {
- if ((m_write_op && ha_get_bit_in_write_set(i+1)) ||
- (!m_write_op && ha_get_bit_in_read_set(i+1)))
- {
- return TRUE;
- }
- }
- }
+ if (bitmap_is_set(table->write_set,
+ table->field[*blob_index]->field_index))
+ return TRUE;
+ } while (++blob_index != blob_index_end);
return FALSE;
}
@@ -1409,10 +1412,9 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
if (type >= TL_WRITE_ALLOW_WRITE)
return NdbOperation::LM_Exclusive;
- else if (uses_blob_value())
+ if (uses_blob_value())
return NdbOperation::LM_Read;
- else
- return NdbOperation::LM_CommittedRead;
+ return NdbOperation::LM_CommittedRead;
}
static const ulong index_type_flags[]=
@@ -1587,13 +1589,13 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
for (i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
- if (ha_get_bit_in_read_set(i+1) ||
+ if (bitmap_is_set(table->read_set, i) ||
((field->flags & PRI_KEY_FLAG)))
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(op->getNdbError());
}
- else
+ else
{
m_value[i].ptr= NULL;
}
@@ -1697,7 +1699,7 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data,
DBUG_ENTER("complemented_read");
m_write_op= FALSE;
- if (ha_get_all_bit_in_read_set())
+ if (bitmap_is_set_all(table->read_set))
{
// We have allready retrieved all fields, nothing to complement
DBUG_RETURN(0);
@@ -1728,7 +1730,8 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data,
{
Field *field= table->field[i];
if (!((field->flags & PRI_KEY_FLAG) ||
- (ha_get_bit_in_read_set(i+1))))
+ bitmap_is_set(table->read_set, i)) &&
+ !bitmap_is_set(table->write_set, i))
{
if (get_ndb_value(op, field, i, new_data))
ERR_RETURN(trans->getNdbError());
@@ -1752,7 +1755,7 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data,
{
Field *field= table->field[i];
if (!((field->flags & PRI_KEY_FLAG) ||
- (ha_get_bit_in_read_set(i+1))))
+ bitmap_is_set(table->read_set, i)))
{
m_value[i].ptr= NULL;
}
@@ -1853,11 +1856,11 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
uint32 part_id;
int error;
longlong func_value;
- if ((error= m_part_info->get_partition_id(m_part_info, &part_id,
- &func_value)))
- {
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ error= m_part_info->get_partition_id(m_part_info, &part_id, &func_value);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ if (error)
DBUG_RETURN(error);
- }
op->setPartitionId(part_id);
}
}
@@ -2449,11 +2452,11 @@ int ha_ndbcluster::write_row(byte *record)
{
uint32 part_id;
int error;
- if ((error= m_part_info->get_partition_id(m_part_info, &part_id,
- &func_value)))
- {
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ error= m_part_info->get_partition_id(m_part_info, &part_id, &func_value);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ if (error)
DBUG_RETURN(error);
- }
op->setPartitionId(part_id);
}
@@ -2475,25 +2478,27 @@ int ha_ndbcluster::write_row(byte *record)
}
else
{
- int res;
-
- if ((res= set_primary_key_from_record(op, record)))
- return res;
+ int error;
+ if ((error= set_primary_key_from_record(op, record)))
+ DBUG_RETURN(error);
}
// Set non-key attribute(s)
bool set_blob_value= FALSE;
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
for (i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
- (ha_get_bit_in_write_set(i + 1) || !m_use_write) &&
+ (bitmap_is_set(table->write_set, i) || !m_use_write) &&
set_ndb_value(op, field, i, record-table->record[0], &set_blob_value))
{
m_skip_auto_increment= TRUE;
+ dbug_tmp_restore_column_map(table->read_set, old_map);
ERR_RETURN(op->getNdbError());
}
}
+ dbug_tmp_restore_column_map(table->read_set, old_map);
if (m_use_partition_function)
{
@@ -2571,6 +2576,7 @@ int ha_ndbcluster::write_row(byte *record)
}
m_skip_auto_increment= TRUE;
+ DBUG_PRINT("exit",("ok"));
DBUG_RETURN(0);
}
@@ -2630,7 +2636,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
{
table->timestamp_field->set_time();
- ha_set_bit_in_write_set(table->timestamp_field->fieldnr);
+ bitmap_set_bit(table->write_set, table->timestamp_field->field_index);
}
if (m_use_partition_function &&
@@ -2744,14 +2750,19 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
m_rows_changed++;
// Set non-key attribute(s)
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
for (i= 0; i < table_share->fields; i++)
{
Field *field= table->field[i];
- if (ha_get_bit_in_write_set(i+1) &&
+ if (bitmap_is_set(table->write_set, i) &&
(!(field->flags & PRI_KEY_FLAG)) &&
set_ndb_value(op, field, i, new_data - table->record[0]))
+ {
+ dbug_tmp_restore_column_map(table->read_set, old_map);
ERR_RETURN(op->getNdbError());
+ }
}
+ dbug_tmp_restore_column_map(table->read_set, old_map);
if (m_use_partition_function)
{
@@ -2843,9 +2854,8 @@ int ha_ndbcluster::delete_row(const byte *record)
}
else
{
- int res;
- if ((res= set_primary_key_from_record(op, record)))
- return res;
+ if ((error= set_primary_key_from_record(op, record)))
+ DBUG_RETURN(error);
}
}
@@ -2876,6 +2886,7 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
{
Field **p_field= table->field, *field= *p_field;
uint row_offset= (uint) (buf - table->record[0]);
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
DBUG_ENTER("ndb_unpack_record");
// Set null flag(s)
@@ -2926,13 +2937,13 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
field->ptr= save_field_ptr;
DBUG_PRINT("info",("[%u] SET",
(*value).rec->getColumn()->getColumnNo()));
- DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
+ DBUG_DUMP("info", (const char*) field->ptr, field->pack_length());
}
else
{
DBUG_PRINT("info",("[%u] SET",
(*value).rec->getColumn()->getColumnNo()));
- DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
+ DBUG_DUMP("info", (const char*) field->ptr, field->pack_length());
}
}
else
@@ -2965,6 +2976,7 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
}
}
}
+ dbug_tmp_restore_column_map(table->write_set, old_map);
DBUG_VOID_RETURN;
}
@@ -3487,7 +3499,7 @@ void ha_ndbcluster::info(uint flag)
if (m_table_info)
{
if (m_ha_not_exact_count)
- records= 100;
+ stats.records= 100;
else
records_update();
}
@@ -3501,14 +3513,14 @@ void ha_ndbcluster::info(uint flag)
if (current_thd->variables.ndb_use_exact_count &&
ndb_get_table_statistics(ndb, m_table, &stat) == 0)
{
- mean_rec_length= stat.row_size;
- data_file_length= stat.fragment_memory;
- records= stat.row_count;
+ stats.mean_rec_length= stat.row_size;
+ stats.data_file_length= stat.fragment_memory;
+ stats.records= stat.row_count;
}
else
{
- mean_rec_length= 0;
- records= 100;
+ stats.mean_rec_length= 0;
+ stats.records= 100;
}
}
}
@@ -3529,8 +3541,7 @@ void ha_ndbcluster::info(uint flag)
{
Ndb *ndb= get_ndb();
- auto_increment_value=
- ndb->readAutoIncrementValue(m_table);
+ stats.auto_increment_value= ndb->readAutoIncrementValue(m_table);
}
}
DBUG_VOID_RETURN;
@@ -3554,18 +3565,6 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
{
DBUG_ENTER("extra");
switch (operation) {
- case HA_EXTRA_RESET: /* Reset database to after open */
- DBUG_PRINT("info", ("HA_EXTRA_RESET"));
- DBUG_PRINT("info", ("Clearing condition stack"));
- cond_clear();
- /*
- * Regular partition pruning will set the bitmap appropriately.
- * Some queries like ALTER TABLE doesn't use partition pruning and
- * thus the 'used_partitions' bitmap needs to be initialized
- */
- if (m_part_info)
- bitmap_set_all(&m_part_info->used_partitions);
- break;
case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
if (current_thd->lex->sql_command == SQLCOM_REPLACE && !m_has_unique_index)
@@ -3601,6 +3600,22 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
DBUG_RETURN(0);
}
+
+int ha_ndbcluster::reset()
+{
+ DBUG_ENTER("ha_ndbcluster::reset");
+ cond_clear();
+ /*
+ Regular partition pruning will set the bitmap appropriately.
+ Some queries like ALTER TABLE doesn't use partition pruning and
+ thus the 'used_partitions' bitmap needs to be initialized
+ */
+ if (m_part_info)
+ bitmap_set_all(&m_part_info->used_partitions);
+ DBUG_RETURN(0);
+}
+
+
/*
Start of an insert, remember number of rows to be inserted, it will
be used in write_row and get_autoincrement to send an optimal number
@@ -3717,7 +3732,7 @@ const char** ha_ndbcluster::bas_ext() const
double ha_ndbcluster::scan_time()
{
DBUG_ENTER("ha_ndbcluster::scan_time()");
- double res= rows2double(records*1000);
+ double res= rows2double(stats.records*1000);
DBUG_PRINT("exit", ("table: %s value: %f",
m_tabname, res));
DBUG_RETURN(res);
@@ -3801,8 +3816,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{
int error=0;
NdbTransaction* trans= NULL;
-
DBUG_ENTER("external_lock");
+
/*
Check that this handler instance has a connection
set up to the Ndb object of thd
@@ -3813,9 +3828,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
- DBUG_PRINT("enter", ("this: %x thd: %lx thd_ndb: %lx "
+ DBUG_PRINT("enter", ("this: 0x%lx thd: 0x%lx thd_ndb: %lx "
"thd_ndb->lock_count: %d",
- this, thd, thd_ndb, thd_ndb->lock_count));
+ (long) this, (long) thd, (long) thd_ndb,
+ thd_ndb->lock_count));
if (lock_type != F_UNLCK)
{
@@ -5056,7 +5072,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
/* Drop the table from NDB */
- int res;
+ int res= 0;
if (h && h->m_table)
{
if (dict->dropTableGlobal(*h->m_table))
@@ -5084,7 +5100,6 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
ndb_table_id= ndbtab_g.get_table()->getObjectId();
ndb_table_version= ndbtab_g.get_table()->getObjectVersion();
#endif
- res= 0;
}
else if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT)
{
@@ -5252,7 +5267,9 @@ ulonglong ha_ndbcluster::get_auto_increment()
HA_NEED_READ_RANGE_BUFFER | \
HA_CAN_GEOMETRY | \
HA_CAN_BIT_FIELD | \
- HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS
+ HA_PRIMARY_KEY_REQUIRED_FOR_POSITION | \
+ HA_PRIMARY_KEY_REQUIRED_FOR_DELETE | \
+ HA_PARTIAL_COLUMN_READ
ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg):
handler(&ndbcluster_hton, table_arg),
@@ -5295,8 +5312,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg):
m_tabname[0]= '\0';
m_dbname[0]= '\0';
- records= ~(ha_rows)0; // uninitialized
- block_size= 1024;
+ stats.records= ~(ha_rows)0; // uninitialized
+ stats.block_size= 1024;
for (i= 0; i < MAX_KEY; i++)
ndb_init_index(m_index[i]);
@@ -5620,7 +5637,7 @@ int ndbcluster_table_exists_in_engine(THD* thd, const char *db,
{
Ndb* ndb;
DBUG_ENTER("ndbcluster_table_exists_in_engine");
- DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
+ DBUG_PRINT("enter", ("db: %s name: %s", db, name));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -5629,14 +5646,13 @@ int ndbcluster_table_exists_in_engine(THD* thd, const char *db,
NdbDictionary::Dictionary::List list;
if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError());
- for (int i= 0 ; i < list.count ; i++)
+ for (uint i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
if (my_strcasecmp(system_charset_info, elmt.database, db))
continue;
if (my_strcasecmp(system_charset_info, elmt.name, name))
continue;
- // table found
DBUG_PRINT("info", ("Found table"));
DBUG_RETURN(1);
}
@@ -5766,6 +5782,8 @@ int ndbcluster_find_all_files(THD *thd)
NDBDICT *dict= ndb->getDictionary();
int unhandled, retries= 5, skipped;
+ LINT_INIT(unhandled);
+ LINT_INIT(skipped);
do
{
NdbDictionary::Dictionary::List list;
@@ -6268,9 +6286,11 @@ void ha_ndbcluster::print_error(int error, myf errflag)
if (error == HA_ERR_NO_PARTITION_FOUND)
{
char buf[100];
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
my_error(ER_NO_PARTITION_FOR_GIVEN_VALUE, MYF(0),
m_part_info->part_expr->null_value ? "NULL" :
llstr(m_part_info->part_expr->val_int(), buf));
+ dbug_tmp_restore_column_map(table->read_set, old_map);
}
else
handler::print_error(error, errflag);
@@ -6488,12 +6508,11 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
DBUG_RETURN(10); /* Good guess when you don't know anything */
}
-ulong ha_ndbcluster::table_flags(void) const
+ulonglong ha_ndbcluster::table_flags(void) const
{
if (m_ha_not_exact_count)
- return m_table_flags | HA_NOT_EXACT_COUNT;
- else
- return m_table_flags;
+ return m_table_flags & ~HA_STATS_RECORDS_IS_EXACT;
+ return m_table_flags;
}
const char * ha_ndbcluster::table_type() const
{
@@ -6527,10 +6546,6 @@ bool ha_ndbcluster::low_byte_first() const
return TRUE;
#endif
}
-bool ha_ndbcluster::has_transactions()
-{
- return TRUE;
-}
const char* ha_ndbcluster::index_type(uint key_number)
{
switch (get_index_type(key_number)) {
@@ -9585,7 +9600,7 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
for (i= 0; i < part_info->part_field_list.elements; i++)
{
- NDBCOL *col= tab->getColumn(fields[i]->fieldnr - 1);
+ NDBCOL *col= tab->getColumn(fields[i]->field_index);
DBUG_PRINT("info",("setting dist key on %s", col->getName()));
col->setPartitionKey(TRUE);
}
@@ -9675,7 +9690,7 @@ bool ha_ndbcluster::check_if_incompatible_data(HA_CREATE_INFO *info,
{
Field *field= table->field[i];
const NDBCOL *col= tab->getColumn(field->field_name);
- if (field->add_index &&
+ if ((field->flags & FIELD_IN_ADD_INDEX) &&
col->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
{
DBUG_PRINT("info", ("add/drop index not supported for disk stored column"));
@@ -9951,10 +9966,11 @@ bool ha_ndbcluster::get_no_parts(const char *name, uint *no_parts)
const NDBTAB *tab;
int err;
DBUG_ENTER("ha_ndbcluster::get_no_parts");
+ LINT_INIT(err);
set_dbname(name);
set_tabname(name);
- do
+ for (;;)
{
if (check_ndb_connection())
{
@@ -9968,22 +9984,21 @@ bool ha_ndbcluster::get_no_parts(const char *name, uint *no_parts)
ERR_BREAK(dict->getNdbError(), err);
*no_parts= ndbtab_g.get_table()->getFragmentCount();
DBUG_RETURN(FALSE);
- } while (1);
+ }
-end:
print_error(err, MYF(0));
DBUG_RETURN(TRUE);
}
-static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond)
+static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables,
+ COND *cond)
{
TABLE* table= tables->table;
Ndb *ndb= check_ndb_in_thd(thd);
NdbDictionary::Dictionary* dict= ndb->getDictionary();
NdbDictionary::Dictionary::List dflist;
NdbError ndberr;
- unsigned i;
-
+ uint i;
DBUG_ENTER("ndbcluster_fill_files_table");
dict->listObjects(dflist, NdbDictionary::Object::Datafile);
@@ -9995,12 +10010,13 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond)
{
NdbDictionary::Dictionary::List::Element& elt = dflist.elements[i];
Ndb_cluster_connection_node_iter iter;
- unsigned id;
-
+ uint id;
+
g_ndb_cluster_connection->init_get_next_node(iter);
while ((id= g_ndb_cluster_connection->get_next_node(iter)))
{
+ uint c= 0;
NdbDictionary::Datafile df= dict->getDatafile(id, elt.name);
ndberr= dict->getNdbError();
if(ndberr.classification != NdbError::NoError)
@@ -10018,7 +10034,6 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond)
ERR_RETURN(ndberr);
}
- int c= 0;
table->field[c++]->set_null(); // FILE_ID
table->field[c++]->store(elt.name, strlen(elt.name),
system_charset_info);