diff options
author | Nikita Malyavin <nikitamalyavin@gmail.com> | 2019-12-16 13:45:02 +1000 |
---|---|---|
committer | Nikita Malyavin <nikitamalyavin@gmail.com> | 2020-01-21 21:11:10 +1000 |
commit | d6b24b7457bc106d2a8271193553ff0dfcb44113 (patch) | |
tree | 476adc36538e4a645b71c588d12e0fc37fa876d3 | |
parent | 18fad2d83e1d681f3508205d660a1eca16782d20 (diff) | |
download | mariadb-git-d6b24b7457bc106d2a8271193553ff0dfcb44113.tar.gz |
MDEV-16983 Application-time periods: foreign key PART 2/3
Add correct update check, which preserves referential integrity on not-updated part of period
-rw-r--r-- | mysql-test/suite/period/r/fk.result | 16 | ||||
-rw-r--r-- | mysql-test/suite/period/t/fk.test | 20 | ||||
-rw-r--r-- | sql/field.h | 5 | ||||
-rw-r--r-- | sql/handler.cc | 262 |
4 files changed, 206 insertions, 97 deletions
diff --git a/mysql-test/suite/period/r/fk.result b/mysql-test/suite/period/r/fk.result index 82c77c794ab..5214c4559ad 100644 --- a/mysql-test/suite/period/r/fk.result +++ b/mysql-test/suite/period/r/fk.result @@ -124,47 +124,31 @@ id x s e 1 2 2017-01-28 2017-01-29 update t set x= 2 where s='2017-01-03' and e='2017-01-20'; ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails -# -# False-positives -# # Expand left update t set s= '2017-01-01' where s = '2017-01-03' and e = '2017-01-20'; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails # Shrink left update t set s= '2017-01-05' where e = '2017-01-20'; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails # Expand right update t set e= '2017-02-10' where s = '2017-01-26' and e = '2017-01-30'; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails # Shrink right update t set e= '2017-01-29' where s = '2017-01-26'; ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails delete from s where s = '2017-01-27' and e = '2017-01-30'; update t set e= '2017-01-29' where s = '2017-01-26' and x = 1; # Shrink both -# Not a false-positive update t set s= '2017-01-27', e= '2017-01-28' where x = 2; ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails -# False-positive update t set s= '2017-01-28', e= '2017-01-29' where x = 2; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails # Expand both update t set s= '2017-01-20', e= '2017-02-05' where x = 2; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails # Move right -# Not a false-positive update t set s= '2017-02-02', e= '2017-02-25' where x = 2; ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails -# False-positive update t set s= '2017-01-28', e= '2017-02-25' where x = 2; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails # Move left -# Not a false-positive update t set s= '2017-01-20', e= '2017-01-27' where x = 2; ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails -# False-positive update t set s= '2017-01-20', e= '2017-01-29' where x = 2; -ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails drop table s; delete from t; create or replace table s (x int, y int, z char(200), pk int, diff --git a/mysql-test/suite/period/t/fk.test b/mysql-test/suite/period/t/fk.test index 1168a1b185b..4d312b97a1e 100644 --- a/mysql-test/suite/period/t/fk.test +++ b/mysql-test/suite/period/t/fk.test @@ -93,20 +93,13 @@ select * from s; update t set x= 2 where s='2017-01-03' and e='2017-01-20'; ---echo # ---echo # False-positives ---echo # - --echo # Expand left ---error ER_ROW_IS_REFERENCED_2 update t set s= '2017-01-01' where s = '2017-01-03' and e = '2017-01-20'; --echo # Shrink left ---error ER_ROW_IS_REFERENCED_2 update t set s= '2017-01-05' where e = '2017-01-20'; --echo # Expand right ---error ER_ROW_IS_REFERENCED_2 update t set e= '2017-02-10' where s = '2017-01-26' and e = '2017-01-30'; --echo # Shrink right @@ -117,31 +110,24 @@ delete from s where s = '2017-01-27' and e = '2017-01-30'; update t set e= '2017-01-29' where s = '2017-01-26' and x = 1; --echo # Shrink both ---echo # Not a false-positive --error ER_ROW_IS_REFERENCED_2 update t set s= '2017-01-27', e= '2017-01-28' where x = 2; ---echo # False-positive ---error ER_ROW_IS_REFERENCED_2 + update t set s= '2017-01-28', e= '2017-01-29' where x = 2; --echo # Expand both ---error ER_ROW_IS_REFERENCED_2 update t set s= '2017-01-20', e= '2017-02-05' where x = 2; --echo # Move right ---echo # Not a false-positive --error ER_ROW_IS_REFERENCED_2 update t set s= '2017-02-02', e= '2017-02-25' where x = 2; ---echo # False-positive ---error ER_ROW_IS_REFERENCED_2 + update t set s= '2017-01-28', e= '2017-02-25' where x = 2; --echo # Move left ---echo # Not a false-positive --error ER_ROW_IS_REFERENCED_2 update t set s= '2017-01-20', e= '2017-01-27' where x = 2; ---echo # False-positive ---error ER_ROW_IS_REFERENCED_2 + update t set s= '2017-01-20', e= '2017-01-29' where x = 2; # TODO remove when deadlock issue will be solved diff --git a/sql/field.h b/sql/field.h index 4f81bd5f87d..953a7cc17df 100644 --- a/sql/field.h +++ b/sql/field.h @@ -1117,6 +1117,11 @@ public: my_ptrdiff_t l_offset= (my_ptrdiff_t) (record - table->record[0]); return ptr + l_offset; } + uchar *ptr_in_record( uchar *record) const + { + my_ptrdiff_t l_offset= (my_ptrdiff_t) (record - table->record[0]); + return ptr + l_offset; + } virtual int set_default(); bool has_update_default_function() const diff --git a/sql/handler.cc b/sql/handler.cc index d6194fa5fb1..67dbc22732f 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -6841,6 +6841,43 @@ void set_bits_with_key(MY_BITMAP *map, const KEY *key, uint key_parts) bitmap_set_bit(map, key->key_part[i].fieldnr - 1); } +static int period_row_check_delete_for_key(const uchar *record, + const FOREIGN_KEY &fk) +{ + handler *foreign_handler= fk.foreign_key->table->file; + + /* We shouldn't save cursor here, since this handler is never used. + The foreign table is only opened for FK matches. */ + int error= foreign_handler->ha_index_init(fk.foreign_key_nr, false); + if(error) + return error; + + + set_bits_with_key(fk.foreign_key->table->read_set, + fk.foreign_key, fk.fields_num); + auto *record_buffer= foreign_handler->get_table()->record[0]; + auto *key_buffer= foreign_handler->get_table()->record[1]; + + key_copy(key_buffer, record, fk.referenced_key, 0); + error= period_find_overlapping_record(foreign_handler, + key_buffer, + record, + record_buffer, + *fk.referenced_key, + *fk.foreign_key); + + int end_error= foreign_handler->ha_index_end(); + + if (error == HA_ERR_KEY_NOT_FOUND) // no key matched + return end_error; + if (error) + return error; + if (end_error) + return end_error; + + return HA_ERR_ROW_IS_REFERENCED; +} + /** @param record record to update from, or a deleted record */ int handler::period_row_del_fk_check(const uchar *record) @@ -6852,47 +6889,17 @@ int handler::period_row_del_fk_check(const uchar *record) for(int k= 0; k < table->referenced_keys; k++) { - auto &fk= table->referenced[k]; + const auto &fk= table->referenced[k]; + if (!fk.has_period) continue; DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts); DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts); - handler *foreign_handler= fk.foreign_key->table->file; - - /* We shouldn't save cursor here, since this handler is never used. - The foreign table is only opened for FK matches. */ - int error= foreign_handler->ha_index_init(fk.foreign_key_nr, false); - if(error) - return error; - - - set_bits_with_key(fk.foreign_key->table->read_set, - fk.foreign_key, fk.fields_num); - auto *record_buffer= foreign_handler->get_table()->record[0]; - auto *key_buffer= foreign_handler->get_table()->record[1]; - - key_copy(key_buffer, record, fk.referenced_key, 0); - error= period_find_overlapping_record(foreign_handler, - key_buffer, - record, - record_buffer, - *fk.referenced_key, - *fk.foreign_key); - - int end_error= foreign_handler->ha_index_end(); - - if (error == HA_ERR_KEY_NOT_FOUND) // no key matched - return end_error; - if (error) - return error; - if (end_error) - return end_error; - if (key_period_compare_periods(*fk.referenced_key, - *fk.foreign_key, - record, record_buffer) == 0) - return HA_ERR_ROW_IS_REFERENCED; + int res= period_row_check_delete_for_key(record, fk); + if(res) + return res; } return 0; } @@ -6993,6 +7000,33 @@ static int period_check_row_references(handler *ref_handler, return 0; } +static int period_row_check_insert_for_key(const uchar *record, + const FOREIGN_KEY &fk) +{ + handler *ref_handler= fk.referenced_key->table->file; + + int error= ref_handler->ha_index_init(fk.referenced_key_nr, false); + if(error) + return error; + + set_bits_with_key(fk.referenced_key->table->read_set, + fk.referenced_key, fk.fields_num); + + auto *ref_record= ref_handler->get_table()->record[0]; + auto *key_buffer= ref_handler->get_table()->record[1]; + + key_copy(key_buffer, record, fk.foreign_key, 0); + bool row_references; + error= period_check_row_references(ref_handler, key_buffer, + record, ref_record, + *fk.foreign_key, *fk.referenced_key); + if (error == HA_ERR_KEY_NOT_FOUND) + error= HA_ERR_NO_REFERENCED_ROW; + + int end_error= ref_handler->ha_index_end(); + + return error ? error : end_error; +} int handler::period_row_ins_fk_check(const uchar *record) { @@ -7001,7 +7035,7 @@ int handler::period_row_ins_fk_check(const uchar *record) for(int k= 0; k < table->foreign_keys; k++) { - auto &fk= table->foreign[k]; + const auto &fk= table->foreign[k]; if (!fk.has_period) continue; if (table->versioned() && !table->vers_end_field()->is_max(record)) @@ -7010,43 +7044,143 @@ int handler::period_row_ins_fk_check(const uchar *record) DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts); DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts); - handler *ref_handler= fk.referenced_key->table->file; + int res= period_row_check_insert_for_key(record, fk); + if (res) + return res; + } + return 0; +} - int error= ref_handler->ha_index_init(fk.referenced_key_nr, false); - if(error) - return error; - set_bits_with_key(fk.referenced_key->table->read_set, - fk.referenced_key, fk.fields_num); +int handler::period_row_upd_fk_check(const uchar *old_data, const uchar *new_data) +{ + if (!table->s->period.name) + return 0; + + bool is_versioned_delete= table->versioned() && + !table->vers_end_field()->is_max(new_data); + bool is_history_update= table->versioned() && + !table->vers_end_field()->is_max(old_data); + + /* + 1. If base key parts changed, or old and new periods do not overlap, + emit delete + insert checks. + 2. If period key part is updated then there can be three operations: + grow, shrink or move. + 1. Move is enlarge + shrink. + 2. Each operation can be left or right. + 3. Move right is (enlarge right + shrink left) and vice-versa. + 4. Enlarge emits insert check on enlarged area. + Shrink emits delete check on shrinked area. + */ + if (!check_overlaps_buffer) + check_overlaps_buffer= (uchar*)alloc_root(&table_share->mem_root, + table_share->max_unique_length + + table_share->reclength); + auto *start_field= table->field[table->s->period.start_fieldno]; + auto *end_field= table->field[table->s->period.end_fieldno]; + auto field_len= start_field->pack_length(); + + auto record_dup= check_overlaps_buffer; + // note that these two store pointers to fields in record_dup + Binary_value dup_start(start_field->ptr_in_record(record_dup), field_len); + Binary_value dup_end(end_field->ptr_in_record(record_dup), field_len); + + uchar storage[4][Type_handler_datetime::hires_bytes(MAX_DATETIME_PRECISION)]; + Binary_value old_start(storage[0], field_len); + Binary_value old_end(storage[1], field_len); + Binary_value new_start(storage[2], field_len); + Binary_value new_end(storage[3], field_len); + old_start.fill(start_field->ptr_in_record(old_data)); + old_end.fill(end_field->ptr_in_record(old_data)); + new_start.fill(start_field->ptr_in_record(new_data)); + new_end.fill(end_field->ptr_in_record(new_data)); + + // we will use record_dup instead of old_data to modify period values + memcpy(record_dup, old_data, table->s->reclength); + + for(int k= 0; !is_history_update && k < table->referenced_keys; k++) + { + const auto &fk= table->referenced[k]; + if (!fk.has_period) + continue; + + DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts); + DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts); - auto *ref_record= ref_handler->get_table()->record[0]; - auto *key_buffer= ref_handler->get_table()->record[1]; + const auto &key= *fk.referenced_key; + DBUG_ASSERT(key.table->alias.eq(&table->alias, table->alias.charset())); - key_copy(key_buffer, record, fk.foreign_key, 0); - bool row_references; - error= period_check_row_references(ref_handler, key_buffer, - record, ref_record, - *fk.foreign_key, *fk.referenced_key); + int error= 0; + if (TABLE::check_period_overlaps(key, key, old_data, new_data) != 0) + { + error= period_row_check_delete_for_key(old_data, fk); + if (error) + return error; + continue; + } - int end_error= ref_handler->ha_index_end(); + if (old_start < new_start) + { + dup_end.fill(new_start); + error= period_row_check_delete_for_key(record_dup, fk); + if (error) + return error; + dup_end.fill(old_end); + } - if (error == HA_ERR_KEY_NOT_FOUND) - return HA_ERR_NO_REFERENCED_ROW; - else if (error) - return error; - else if (end_error) - return end_error; + if (old_end > new_end) + { + dup_start.fill(new_end); + error= period_row_check_delete_for_key(record_dup, fk); + if (error) + return error; + dup_start.fill(old_start); + } } - return 0; -} + memcpy(record_dup, new_data, table->s->reclength); -int handler::period_row_upd_fk_check(const uchar *old_data, const uchar *new_data) -{ - int error= period_row_del_fk_check(old_data); - if (!error) - error= period_row_ins_fk_check(new_data); - return error; + for(int k= 0; !is_versioned_delete && k < table->foreign_keys; k++) + { + const auto &fk= table->foreign[k]; + if (!fk.has_period) + continue; + + DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts); + DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts); + + const auto &key= *fk.foreign_key; + DBUG_ASSERT(key.table->alias.eq(&table->alias, table->alias.charset())); + + int error= 0; + if (TABLE::check_period_overlaps(key, key, old_data, new_data) != 0) + { + error= period_row_check_insert_for_key(new_data, fk); + if (error) + return error; + continue; + } + + if (old_start > new_start) + { + dup_end.fill(old_start); + error= period_row_check_insert_for_key(record_dup, fk); + if (error) + return error; + dup_end.fill(new_end); + } + + if (old_end < new_end) + { + dup_start.fill(old_end); + error= period_row_check_insert_for_key(record_dup, fk); + if (error) + return error; + dup_start.fill(new_start); + } + } + return 0; } int handler::ha_delete_row(const uchar *buf) |