summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc343
1 files changed, 263 insertions, 80 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 739fae79565..30be53f1ddb 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -2,8 +2,7 @@
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
+ the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -822,8 +821,8 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob,
{
char *buf= m_blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
- DBUG_PRINT("value", ("read blob ptr=%x len=%u",
- (UintPtr)buf, (uint)blob_len));
+ DBUG_PRINT("value", ("read blob ptr: 0x%lx len: %u",
+ (long)buf, (uint)blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
@@ -1059,6 +1058,7 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
int error= 0;
const char *index_name;
char unique_index_name[FN_LEN];
+ bool null_in_unique_index= false;
static const char* unique_suffix= "$unique";
KEY* key_info= tab->key_info;
const char **key_name= tab->s->keynames.type_names;
@@ -1096,10 +1096,26 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
error= create_unique_index(unique_index_name, key_info);
break;
case UNIQUE_INDEX:
- if (!(error= check_index_fields_not_null(i)))
- error= create_unique_index(unique_index_name, key_info);
+ if (check_index_fields_not_null(i))
+ {
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_NULL_COLUMN_IN_INDEX,
+ "Ndb does not support unique index on NULL valued attributes, index access with NULL value will become full table scan");
+ null_in_unique_index= true;
+ }
+ error= create_unique_index(unique_index_name, key_info);
break;
case ORDERED_INDEX:
+ if (key_info->algorithm == HA_KEY_ALG_HASH)
+ {
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_UNSUPPORTED_EXTENSION,
+ ER(ER_UNSUPPORTED_EXTENSION),
+ "Ndb does not support non-unique "
+ "hash based indexes");
+ error= HA_ERR_UNSUPPORTED;
+ break;
+ }
error= create_ordered_index(index_name, key_info);
break;
default:
@@ -1129,6 +1145,11 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
m_index[i].unique_index= (void *) index;
error= fix_unique_index_attr_order(m_index[i], index, key_info);
}
+ if (idx_type == UNIQUE_INDEX &&
+ phase != ILBP_CREATE &&
+ check_index_fields_not_null(i))
+ null_in_unique_index= true;
+ m_index[i].null_in_unique_index= null_in_unique_index;
}
DBUG_RETURN(error);
@@ -1150,7 +1171,7 @@ NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
ORDERED_INDEX);
}
-int ha_ndbcluster::check_index_fields_not_null(uint inx)
+bool ha_ndbcluster::check_index_fields_not_null(uint inx)
{
KEY* key_info= table->key_info + inx;
KEY_PART_INFO* key_part= key_info->key_part;
@@ -1161,14 +1182,10 @@ int ha_ndbcluster::check_index_fields_not_null(uint inx)
{
Field* field= key_part->field;
if (field->maybe_null())
- {
- my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
- MYF(0),field->field_name);
- DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
- }
+ DBUG_RETURN(true);
}
- DBUG_RETURN(0);
+ DBUG_RETURN(false);
}
void ha_ndbcluster::release_metadata()
@@ -1261,6 +1278,12 @@ inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
return m_index[idx_no].type;
}
+inline bool ha_ndbcluster::has_null_in_unique_index(uint idx_no) const
+{
+ DBUG_ASSERT(idx_no < MAX_KEY);
+ return m_index[idx_no].null_in_unique_index;
+}
+
/*
Get the flags for an index
@@ -1596,7 +1619,7 @@ bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans,
* primary key or unique index values
*/
-int ha_ndbcluster::peek_indexed_rows(const byte *record)
+int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
{
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
@@ -1609,7 +1632,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
first= NULL;
- if (table->s->primary_key != MAX_KEY)
+ if (check_pk && table->s->primary_key != MAX_KEY)
{
/*
* Fetch any row with colliding primary key
@@ -2090,6 +2113,42 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
}
/*
+ Unique index scan in NDB (full table scan with scan filter)
+ */
+
+int ha_ndbcluster::unique_index_scan(const KEY* key_info,
+ const byte *key,
+ uint key_len,
+ byte *buf)
+{
+ int res;
+ NdbScanOperation *op;
+ NdbTransaction *trans= m_active_trans;
+
+ DBUG_ENTER("unique_index_scan");
+ DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
+
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ bool need_pk = (lm == NdbOperation::LM_Read);
+ if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
+ op->readTuples(lm,
+ (need_pk)?NdbScanOperation::SF_KeyInfo:0,
+ parallelism))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= op;
+ if (generate_scan_filter_from_key(op, key_info, key, key_len, buf))
+ DBUG_RETURN(ndb_err(trans));
+ if ((res= define_read_attrs(buf, op)))
+ DBUG_RETURN(res);
+
+ if (execute_no_commit(this,trans,false) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+/*
Start full table scan in NDB
*/
@@ -2166,7 +2225,7 @@ int ha_ndbcluster::write_row(byte *record)
start_bulk_insert will set parameters to ensure that each
write_row is committed individually
*/
- int peek_res= peek_indexed_rows(record);
+ int peek_res= peek_indexed_rows(record, true);
if (!peek_res)
{
@@ -2335,8 +2394,26 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
NdbScanOperation* cursor= m_active_cursor;
NdbOperation *op;
uint i;
+ bool pk_update= (table->s->primary_key != MAX_KEY &&
+ key_cmp(table->s->primary_key, old_data, new_data));
DBUG_ENTER("update_row");
+ /*
+ * If IGNORE the ignore constraint violations on primary and unique keys,
+ * but check that it is not part of INSERT ... ON DUPLICATE KEY UPDATE
+ */
+ if (m_ignore_dup_key && thd->lex->sql_command == SQLCOM_UPDATE)
+ {
+ int peek_res= peek_indexed_rows(new_data, pk_update);
+
+ if (!peek_res)
+ {
+ DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+ }
+ if (peek_res != HA_ERR_KEY_NOT_FOUND)
+ DBUG_RETURN(peek_res);
+ }
+
statistic_increment(thd->status_var.ha_update_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
{
@@ -2346,8 +2423,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
}
/* Check for update of primary key for special handling */
- if ((table->s->primary_key != MAX_KEY) &&
- (key_cmp(table->s->primary_key, old_data, new_data)))
+ if (pk_update)
{
int read_res, insert_res, delete_res, undo_res;
@@ -2763,7 +2839,7 @@ int ha_ndbcluster::index_read(byte *buf,
}
else if (type == UNIQUE_INDEX)
{
- DBUG_RETURN(1);
+ DBUG_RETURN(unique_index_scan(key_info, key, key_len, buf));
}
break;
case ORDERED_INDEX:
@@ -2856,12 +2932,13 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
bool eq_r, bool sorted,
byte* buf)
{
- KEY* key_info;
+ ndb_index_type type= get_index_type(active_index);
+KEY* key_info;
int error= 1;
DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
DBUG_PRINT("info", ("eq_r: %d, sorted: %d", eq_r, sorted));
- switch (get_index_type(active_index)){
+ switch (type){
case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX:
key_info= table->key_info + active_index;
@@ -2887,6 +2964,11 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
error= unique_index_read(start_key->key, start_key->length, buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
}
+ else if (type == UNIQUE_INDEX)
+ DBUG_RETURN(unique_index_scan(key_info,
+ start_key->key,
+ start_key->length,
+ buf));
break;
default:
break;
@@ -3062,20 +3144,26 @@ void ha_ndbcluster::position(const byte *record)
size_t len = key_part->length;
const byte * ptr = record + key_part->offset;
Field *field = key_part->field;
- if ((field->type() == MYSQL_TYPE_VARCHAR) &&
- ((Field_varstring*)field)->length_bytes == 1)
+ if (field->type() == MYSQL_TYPE_VARCHAR)
{
- /**
- * Keys always use 2 bytes length
- */
- buff[0] = ptr[0];
- buff[1] = 0;
- memcpy(buff+2, ptr + 1, len);
- len += 2;
+ if (((Field_varstring*)field)->length_bytes == 1)
+ {
+ /**
+ * Keys always use 2 bytes length
+ */
+ buff[0] = ptr[0];
+ buff[1] = 0;
+ memcpy(buff+2, ptr + 1, len);
+ }
+ else
+ {
+ memcpy(buff, ptr, len + 2);
+ }
+ len += 2;
}
else
{
- memcpy(buff, ptr, len);
+ memcpy(buff, ptr, len);
}
buff += len;
}
@@ -4097,19 +4185,29 @@ static int create_ndb_column(NDBCOL &col,
col.setType(NDBCOL::Text);
col.setCharset(cs);
}
- // Use "<=" even if "<" is the exact condition
- if (field->max_length() <= (1 << 8))
- goto mysql_type_tiny_blob;
- else if (field->max_length() <= (1 << 16))
{
- col.setInlineSize(256);
- col.setPartSize(2000);
- col.setStripeSize(16);
+ Field_blob *field_blob= (Field_blob *)field;
+ /*
+ * max_data_length is 2^8-1, 2^16-1, 2^24-1 for tiny, blob, medium.
+ * Tinyblob gets no blob parts. The other cases are just a crude
+ * way to control part size and striping.
+ *
+ * In mysql blob(256) is promoted to blob(65535) so it does not
+ * in fact fit "inline" in NDB.
+ */
+ if (field_blob->max_data_length() < (1 << 8))
+ goto mysql_type_tiny_blob;
+ else if (field_blob->max_data_length() < (1 << 16))
+ {
+ col.setInlineSize(256);
+ col.setPartSize(2000);
+ col.setStripeSize(16);
+ }
+ else if (field_blob->max_data_length() < (1 << 24))
+ goto mysql_type_medium_blob;
+ else
+ goto mysql_type_long_blob;
}
- else if (field->max_length() <= (1 << 24))
- goto mysql_type_medium_blob;
- else
- goto mysql_type_long_blob;
break;
mysql_type_medium_blob:
case MYSQL_TYPE_MEDIUM_BLOB:
@@ -6036,7 +6134,7 @@ ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
retry:
if(report_error)
{
- if (file)
+ if (file && pTrans)
{
reterr= file->ndb_err(pTrans);
}
@@ -6117,6 +6215,30 @@ ha_ndbcluster::release_completed_operations(NdbTransaction *trans,
trans->releaseCompletedOperations();
}
+bool
+ha_ndbcluster::null_value_index_search(KEY_MULTI_RANGE *ranges,
+ KEY_MULTI_RANGE *end_range,
+ HANDLER_BUFFER *buffer)
+{
+ DBUG_ENTER("null_value_index_search");
+ KEY* key_info= table->key_info + active_index;
+ KEY_MULTI_RANGE *range= ranges;
+ ulong reclength= table->s->reclength;
+ byte *curr= (byte*)buffer->buffer;
+ byte *end_of_buffer= (byte*)buffer->buffer_end;
+
+ for (; range<end_range && curr+reclength <= end_of_buffer;
+ range++)
+ {
+ const byte *key= range->start_key.key;
+ uint key_len= range->start_key.length;
+ if (check_null_in_key(key_info, key, key_len))
+ DBUG_RETURN(true);
+ curr += reclength;
+ }
+ DBUG_RETURN(false);
+}
+
int
ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
KEY_MULTI_RANGE *ranges,
@@ -6133,11 +6255,14 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
NdbOperation* op;
Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
- if (uses_blob_value(m_retrieve_all_fields))
+ /**
+ * blobs and unique hash index with NULL can't be batched currently
+ */
+ if (uses_blob_value(m_retrieve_all_fields) ||
+ (index_type == UNIQUE_INDEX &&
+ has_null_in_unique_index(active_index) &&
+ null_value_index_search(ranges, ranges+range_count, buffer)))
{
- /**
- * blobs can't be batched currently
- */
m_disable_multi_read= TRUE;
DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges,
@@ -6193,7 +6318,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
goto range;
/* fall through */
case PRIMARY_KEY_INDEX:
- {
multi_range_curr->range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbOperation(tab)) &&
!op->readTuple(lm) &&
@@ -6204,8 +6328,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break;
- }
- break;
case UNIQUE_ORDERED_INDEX:
if (!(multi_range_curr->start_key.length == key_info->key_length &&
multi_range_curr->start_key.flag == HA_READ_KEY_EXACT &&
@@ -6214,18 +6336,16 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
goto range;
/* fall through */
case UNIQUE_INDEX:
- {
multi_range_curr->range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
- !op->readTuple(lm) &&
- !set_index_key(op, key_info, multi_range_curr->start_key.key) &&
- !define_read_attrs(curr, op) &&
- (op->setAbortOption(AO_IgnoreError), TRUE))
- curr += reclength;
+ !op->readTuple(lm) &&
+ !set_index_key(op, key_info, multi_range_curr->start_key.key) &&
+ !define_read_attrs(curr, op) &&
+ (op->setAbortOption(AO_IgnoreError), TRUE))
+ curr += reclength;
else
- ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
+ ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break;
- }
case ORDERED_INDEX:
{
range:
@@ -7968,31 +8088,12 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
NdbScanOperation *op)
{
DBUG_ENTER("generate_scan_filter");
+
if (ndb_cond_stack)
{
- DBUG_PRINT("info", ("Generating scan filter"));
NdbScanFilter filter(op);
- bool multiple_cond= FALSE;
- // Wrap an AND group around multiple conditions
- if (ndb_cond_stack->next) {
- multiple_cond= TRUE;
- if (filter.begin() == -1)
- DBUG_RETURN(1);
- }
- for (Ndb_cond_stack *stack= ndb_cond_stack;
- (stack);
- stack= stack->next)
- {
- Ndb_cond *cond= stack->ndb_cond;
-
- if (build_scan_filter(cond, &filter))
- {
- DBUG_PRINT("info", ("build_scan_filter failed"));
- DBUG_RETURN(1);
- }
- }
- if (multiple_cond && filter.end() == -1)
- DBUG_RETURN(1);
+
+ DBUG_RETURN(generate_scan_filter_from_cond(ndb_cond_stack, filter));
}
else
{
@@ -8003,6 +8104,88 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
}
int
+ha_ndbcluster::generate_scan_filter_from_cond(Ndb_cond_stack *ndb_cond_stack,
+ NdbScanFilter& filter)
+{
+ DBUG_ENTER("generate_scan_filter_from_cond");
+ bool multiple_cond= FALSE;
+
+ DBUG_PRINT("info", ("Generating scan filter"));
+ // Wrap an AND group around multiple conditions
+ if (ndb_cond_stack->next)
+ {
+ multiple_cond= TRUE;
+ if (filter.begin() == -1)
+ DBUG_RETURN(1);
+ }
+ for (Ndb_cond_stack *stack= ndb_cond_stack;
+ (stack);
+ stack= stack->next)
+ {
+ Ndb_cond *cond= stack->ndb_cond;
+
+ if (build_scan_filter(cond, &filter))
+ {
+ DBUG_PRINT("info", ("build_scan_filter failed"));
+ DBUG_RETURN(1);
+ }
+ }
+ if (multiple_cond && filter.end() == -1)
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(0);
+}
+
+int ha_ndbcluster::generate_scan_filter_from_key(NdbScanOperation *op,
+ const KEY* key_info,
+ const byte *key,
+ uint key_len,
+ byte *buf)
+{
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+ NdbScanFilter filter(op);
+ int res;
+
+ DBUG_ENTER("generate_scan_filter_from_key");
+ filter.begin(NdbScanFilter::AND);
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ uint32 pack_len= field->pack_length();
+ const byte* ptr= key;
+ char buf[256];
+ DBUG_PRINT("info", ("Filtering value for %s", field->field_name));
+ DBUG_DUMP("key", (char*)ptr, pack_len);
+ if (key_part->null_bit)
+ {
+ DBUG_PRINT("info", ("Generating ISNULL filter"));
+ if (filter.isnull(key_part->fieldnr-1) == -1)
+ DBUG_RETURN(1);
+ }
+ else
+ {
+ DBUG_PRINT("info", ("Generating EQ filter"));
+ if (filter.cmp(NdbScanFilter::COND_EQ,
+ key_part->fieldnr-1,
+ ptr,
+ pack_len) == -1)
+ DBUG_RETURN(1);
+ }
+ key += key_part->store_length;
+ }
+ // Add any pushed condition
+ if (m_cond_stack &&
+ (res= generate_scan_filter_from_cond(m_cond_stack, filter)))
+ DBUG_RETURN(res);
+
+ if (filter.end() == -1)
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(0);
+}
+
+int
ndbcluster_show_status(THD* thd)
{
Protocol *protocol= thd->protocol;