diff options
-rw-r--r-- | mysql-test/r/ndb_lock.result | 80 | ||||
-rw-r--r-- | mysql-test/r/rpl_ndb_basic.result | 15 | ||||
-rw-r--r-- | mysql-test/t/rpl_ndb_basic.test | 9 | ||||
-rw-r--r-- | sql/ha_ndbcluster.cc | 88 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 4 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.h | 3 | ||||
-rw-r--r-- | sql/sql_delete.cc | 2 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp | 2 |
8 files changed, 192 insertions, 11 deletions
diff --git a/mysql-test/r/ndb_lock.result b/mysql-test/r/ndb_lock.result index ac93f15dac3..b4c98132ad3 100644 --- a/mysql-test/r/ndb_lock.result +++ b/mysql-test/r/ndb_lock.result @@ -63,6 +63,86 @@ pk u o 5 5 5 insert into t1 values (1,1,1); drop table t1; +create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; +insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); +begin; +select * from t1 where x = 1 for update; +x y z +1 one 1 +begin; +select * from t1 where x = 2 for update; +x y z +2 two 2 +select * from t1 where x = 1 for update; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +rollback; +commit; +begin; +select * from t1 where y = 'one' or y = 'three' for update; +x y z +3 three 3 +1 one 1 +begin; +select * from t1 where x = 1 for update; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +rollback; +commit; +begin; +select * from t1 where z > 1 and z < 3 for update; +x y z +2 two 2 +begin; +select * from t1 where x = 1 for update; +x y z +1 one 1 +select * from t1 where x = 2 for update; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +rollback; +commit; +begin; +select * from t1 where x = 1 lock in share mode; +x y z +1 one 1 +begin; +select * from t1 where x = 1 lock in share mode; +x y z +1 one 1 +select * from t1 where x = 2 for update; +x y z +2 two 2 +select * from t1 where x = 1 for update; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +rollback; +commit; +begin; +select * from t1 where y = 'one' or y = 'three' lock in share mode; +x y z +3 three 3 +1 one 1 +begin; +select * from t1 where y = 'one' lock in share mode; +x y z +1 one 1 +select * from t1 where x = 1 for update; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +rollback; +commit; +begin; +select * from t1 where z > 1 and z < 3 lock in share mode; +x y z +2 two 2 +begin; +select * from t1 where z = 1 lock in share mode; +x y z +1 one 1 +select * from t1 where x = 1 for update; +x y z +1 one 1 +select * from t1 where x = 2 for update; +ERROR HY000: Lock wait timeout exceeded; try restarting transaction +rollback; +commit; +drop table t1; create table t3 (id2 int) engine=ndb; lock tables t3 write; unlock tables; diff --git a/mysql-test/r/rpl_ndb_basic.result b/mysql-test/r/rpl_ndb_basic.result index b23e5f03f27..32a1c790c99 100644 --- a/mysql-test/r/rpl_ndb_basic.result +++ b/mysql-test/r/rpl_ndb_basic.result @@ -146,4 +146,19 @@ c1 c2 3 NULL 4 NULL 5 NULL +TRUNCATE t1; +SELECT count(*) FROM t1; +count(*) +0 +INSERT INTO t1 VALUES (101,NULL),(102,NULL),(103,NULL),(104,NULL),(105,NULL),(106,NULL),(107,NULL),(108,NULL),(109,NULL),(1010,NULL); +SELECT count(*) FROM t1; +count(*) +10 +SELECT c1 FROM t1 ORDER BY c1 LIMIT 5; +c1 +101 +102 +103 +104 +105 DROP TABLE t1; diff --git a/mysql-test/t/rpl_ndb_basic.test b/mysql-test/t/rpl_ndb_basic.test index c702908ed68..5290dc377c2 100644 --- a/mysql-test/t/rpl_ndb_basic.test +++ b/mysql-test/t/rpl_ndb_basic.test @@ -172,7 +172,14 @@ connection slave; # here we would get error 1412 prior to bug SELECT * FROM t1 ORDER BY c1 LIMIT 5; - +--connection master +TRUNCATE t1; +SELECT count(*) FROM t1; +INSERT INTO t1 VALUES (101,NULL),(102,NULL),(103,NULL),(104,NULL),(105,NULL),(106,NULL),(107,NULL),(108,NULL),(109,NULL),(1010,NULL); +--sync_slave_with_master +connection slave; +SELECT count(*) FROM t1; +SELECT c1 FROM t1 ORDER BY c1 LIMIT 5; # cleanup --connection master diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 8c4d6d18b9b..94fb7e6eb5c 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -1402,7 +1402,8 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { if (type >= TL_WRITE_ALLOW_WRITE) return NdbOperation::LM_Exclusive; - if (uses_blob_value()) + if (type == TL_READ_WITH_SHARED_LOCKS || + uses_blob_value()) return NdbOperation::LM_Read; return NdbOperation::LM_CommittedRead; } @@ -1947,7 +1948,30 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) int check; NdbTransaction *trans= m_active_trans; - bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE; + if (m_lock_tuple) + { + /* + Lock level m_lock.type either TL_WRITE_ALLOW_WRITE + (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT + LOCK WITH SHARE MODE) and row was not explictly unlocked + with unlock_row() call + */ + NdbConnection *trans= m_active_trans; + NdbOperation *op; + // Lock row + DBUG_PRINT("info", ("Keeping lock on scanned row")); + + if (!(op= m_active_cursor->lockCurrentTuple())) + { + m_lock_tuple= false; + ERR_RETURN(trans->getNdbError()); + } + m_ops_pending++; + } + m_lock_tuple= false; + + bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE && + m_lock.type != TL_READ_WITH_SHARED_LOCKS;; do { DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); /* @@ -1963,6 +1987,13 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) if ((check= cursor->nextResult(contact_ndb, m_force_send)) == 0) { + /* + Explicitly lock tuple if "select for update" or + "select lock in share mode" + */ + m_lock_tuple= (m_lock.type == TL_WRITE_ALLOW_WRITE + || + m_lock.type == TL_READ_WITH_SHARED_LOCKS); DBUG_RETURN(0); } else if (check == 1 || check == 2) @@ -2258,9 +2289,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, restart= FALSE; NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + bool need_pk = (lm == NdbOperation::LM_Read); if (!(op= trans->getNdbIndexScanOperation(m_index[active_index].index, m_table)) || - op->readTuples(lm, 0, parallelism, sorted, descending)) + op->readTuples(lm, 0, parallelism, sorted, descending, false, need_pk)) ERR_RETURN(trans->getNdbError()); if (m_use_partition_function && part_spec != NULL && part_spec->start_part == part_spec->end_part) @@ -2329,8 +2361,11 @@ int ha_ndbcluster::full_table_scan(byte *buf) NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + bool need_pk = (lm == NdbOperation::LM_Read); if (!(op=trans->getNdbScanOperation(m_table)) || - op->readTuples(lm, 0, parallelism)) + op->readTuples(lm, + (need_pk)?NdbScanOperation::SF_KeyInfo:0, + parallelism)) ERR_RETURN(trans->getNdbError()); m_active_cursor= op; @@ -2706,6 +2741,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) DBUG_PRINT("info", ("Calling updateTuple on cursor")); if (!(op= cursor->updateCurrentTuple())) ERR_RETURN(trans->getNdbError()); + m_lock_tuple= false; m_ops_pending++; if (uses_blob_value()) m_blobs_pending= TRUE; @@ -2814,6 +2850,7 @@ int ha_ndbcluster::delete_row(const byte *record) DBUG_PRINT("info", ("Calling deleteTuple on cursor")); if (cursor->deleteCurrentTuple() != 0) ERR_RETURN(trans->getNdbError()); + m_lock_tuple= false; m_ops_pending++; if (m_use_partition_function) @@ -3073,6 +3110,13 @@ int ha_ndbcluster::index_init(uint index, bool sorted) DBUG_PRINT("enter", ("index: %u sorted: %d", index, sorted)); active_index= index; m_sorted= sorted; + /* + Locks are are explicitly released in scan + unless m_lock.type == TL_READ_HIGH_PRIORITY + and no sub-sequent call to unlock_row() + */ + m_lock_tuple= false; + m_lock_tuple= false; DBUG_RETURN(0); } @@ -3997,6 +4041,22 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) } /* + Unlock the last row read in an open scan. + Rows are unlocked by default in ndb, but + for SELECT FOR UPDATE and SELECT LOCK WIT SHARE MODE + locks are kept if unlock_row() is not called. +*/ + +void ha_ndbcluster::unlock_row() +{ + DBUG_ENTER("unlock_row"); + + DBUG_PRINT("info", ("Unlocking row")); + m_lock_tuple= false; + DBUG_VOID_RETURN; +} + +/* Start a transaction for running a statement if one is not already running in a transaction. This will be the case in a BEGIN; COMMIT; block @@ -4413,6 +4473,7 @@ int ha_ndbcluster::create(const char *name, uint pack_length, length, i, pk_length= 0; const void *data, *pack_data; bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE); + bool is_truncate= (current_thd->lex->sql_command == SQLCOM_TRUNCATE); DBUG_ENTER("ha_ndbcluster::create"); DBUG_PRINT("enter", ("name: %s", name)); @@ -4421,6 +4482,12 @@ int ha_ndbcluster::create(const char *name, set_dbname(name); set_tabname(name); + if (is_truncate) + { + DBUG_PRINT("info", ("Dropping and re-creating table for TRUNCATE")); + if ((my_errno= delete_table(name))) + DBUG_RETURN(my_errno); + } table= form; if (create_from_engine) { @@ -4665,7 +4732,9 @@ int ha_ndbcluster::create(const char *name, share->db, share->table_name, m_table->getObjectId(), m_table->getObjectVersion(), - SOT_CREATE_TABLE, 0, 0, 1); + (is_truncate) ? + SOT_TRUNCATE_TABLE : SOT_CREATE_TABLE, + 0, 0, 1); break; } } @@ -5200,7 +5269,8 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, */ int table_dropped= dict->getNdbError().code != 709; - if (!IS_TMP_PREFIX(table_name) && share) + if (!IS_TMP_PREFIX(table_name) && share && + current_thd->lex->sql_command != SQLCOM_TRUNCATE) { ndbcluster_log_schema_op(thd, share, thd->query, thd->query_length, @@ -6183,7 +6253,7 @@ static int ndbcluster_init() #ifdef HAVE_NDB_BINLOG ndbcluster_binlog_init_handlerton(); #endif - h.flags= HTON_TEMPORARY_NOT_SUPPORTED; + h.flags= HTON_CAN_RECREATE | HTON_TEMPORARY_NOT_SUPPORTED; } if (have_ndbcluster != SHOW_OPTION_YES) @@ -7430,6 +7500,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, byte *end_of_buffer= (byte*)buffer->buffer_end; NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + bool need_pk = (lm == NdbOperation::LM_Read); const NDBTAB *tab= m_table; const NDBINDEX *unique_idx= m_index[active_index].unique_index; const NDBINDEX *idx= m_index[active_index].index; @@ -7520,7 +7591,8 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, end_of_buffer -= reclength; } else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab)) - &&!scanOp->readTuples(lm, 0, parallelism, sorted, FALSE, TRUE) + &&!scanOp->readTuples(lm, 0, parallelism, sorted, + FALSE, TRUE, need_pk) &&!generate_scan_filter(m_cond_stack, scanOp) &&!define_read_attrs(end_of_buffer-reclength, scanOp)) { diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 3b93eb4b4c9..af6eb74a236 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -1232,6 +1232,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, case SOT_LOGFILE_GROUP: type_str= "logfile group"; break; + case SOT_TRUNCATE_TABLE: + type_str= "truncate table"; + break; default: abort(); /* should not happen, programming error */ } @@ -1765,6 +1768,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, post_epoch_unlock= 1; break; case SOT_CREATE_TABLE: + case SOT_TRUNCATE_TABLE: pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) { diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h index 7c45dee59d0..58bf7517df5 100644 --- a/sql/ha_ndbcluster_binlog.h +++ b/sql/ha_ndbcluster_binlog.h @@ -49,7 +49,8 @@ enum SCHEMA_OP_TYPE SOT_CLEAR_SLOCK= 7, SOT_TABLESPACE= 8, SOT_LOGFILE_GROUP= 9, - SOT_RENAME_TABLE= 10 + SOT_RENAME_TABLE= 10, + SOT_TRUNCATE_TABLE= 11 }; const uint max_ndb_nodes= 64; /* multiple of 32 */ diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc index a7a50611a02..dd4748bc15c 100644 --- a/sql/sql_delete.cc +++ b/sql/sql_delete.cc @@ -954,8 +954,10 @@ bool mysql_truncate(THD *thd, TABLE_LIST *table_list, bool dont_send_ok) // crashes, replacement works. *(path + path_length - reg_ext_length)= // '\0'; path[path_length - reg_ext_length] = 0; + VOID(pthread_mutex_lock(&LOCK_open)); error= ha_create_table(thd, path, table_list->db, table_list->table_name, &create_info, 1); + VOID(pthread_mutex_unlock(&LOCK_open)); query_cache_invalidate3(thd, table_list, 0); end: diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 3c72c68ddb9..b6961edd019 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -581,7 +581,7 @@ public: int createIndex(NdbIndexImpl &ix, NdbTableImpl & tab); int dropIndex(const char * indexName, const char * tableName); - int dropIndex(NdbIndexImpl &); + int dropIndex(NdbIndexImpl &, const char * tableName); NdbTableImpl * getIndexTable(NdbIndexImpl * index, NdbTableImpl * table); |