summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikita Malyavin <nikitamalyavin@gmail.com>2019-12-16 13:45:02 +1000
committerNikita Malyavin <nikitamalyavin@gmail.com>2020-01-21 21:11:10 +1000
commitd6b24b7457bc106d2a8271193553ff0dfcb44113 (patch)
tree476adc36538e4a645b71c588d12e0fc37fa876d3
parent18fad2d83e1d681f3508205d660a1eca16782d20 (diff)
downloadmariadb-git-d6b24b7457bc106d2a8271193553ff0dfcb44113.tar.gz
MDEV-16983 Application-time periods: foreign key PART 2/3
Add correct update check, which preserves referential integrity on not-updated part of period
-rw-r--r--mysql-test/suite/period/r/fk.result16
-rw-r--r--mysql-test/suite/period/t/fk.test20
-rw-r--r--sql/field.h5
-rw-r--r--sql/handler.cc262
4 files changed, 206 insertions, 97 deletions
diff --git a/mysql-test/suite/period/r/fk.result b/mysql-test/suite/period/r/fk.result
index 82c77c794ab..5214c4559ad 100644
--- a/mysql-test/suite/period/r/fk.result
+++ b/mysql-test/suite/period/r/fk.result
@@ -124,47 +124,31 @@ id x s e
1 2 2017-01-28 2017-01-29
update t set x= 2 where s='2017-01-03' and e='2017-01-20';
ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
-#
-# False-positives
-#
# Expand left
update t set s= '2017-01-01' where s = '2017-01-03' and e = '2017-01-20';
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
# Shrink left
update t set s= '2017-01-05' where e = '2017-01-20';
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
# Expand right
update t set e= '2017-02-10' where s = '2017-01-26' and e = '2017-01-30';
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
# Shrink right
update t set e= '2017-01-29' where s = '2017-01-26';
ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
delete from s where s = '2017-01-27' and e = '2017-01-30';
update t set e= '2017-01-29' where s = '2017-01-26' and x = 1;
# Shrink both
-# Not a false-positive
update t set s= '2017-01-27', e= '2017-01-28' where x = 2;
ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
-# False-positive
update t set s= '2017-01-28', e= '2017-01-29' where x = 2;
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
# Expand both
update t set s= '2017-01-20', e= '2017-02-05' where x = 2;
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
# Move right
-# Not a false-positive
update t set s= '2017-02-02', e= '2017-02-25' where x = 2;
ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
-# False-positive
update t set s= '2017-01-28', e= '2017-02-25' where x = 2;
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
# Move left
-# Not a false-positive
update t set s= '2017-01-20', e= '2017-01-27' where x = 2;
ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
-# False-positive
update t set s= '2017-01-20', e= '2017-01-29' where x = 2;
-ERROR 23000: Cannot delete or update a parent row: a foreign key constraint fails
drop table s;
delete from t;
create or replace table s (x int, y int, z char(200), pk int,
diff --git a/mysql-test/suite/period/t/fk.test b/mysql-test/suite/period/t/fk.test
index 1168a1b185b..4d312b97a1e 100644
--- a/mysql-test/suite/period/t/fk.test
+++ b/mysql-test/suite/period/t/fk.test
@@ -93,20 +93,13 @@ select * from s;
update t set x= 2 where s='2017-01-03' and e='2017-01-20';
---echo #
---echo # False-positives
---echo #
-
--echo # Expand left
---error ER_ROW_IS_REFERENCED_2
update t set s= '2017-01-01' where s = '2017-01-03' and e = '2017-01-20';
--echo # Shrink left
---error ER_ROW_IS_REFERENCED_2
update t set s= '2017-01-05' where e = '2017-01-20';
--echo # Expand right
---error ER_ROW_IS_REFERENCED_2
update t set e= '2017-02-10' where s = '2017-01-26' and e = '2017-01-30';
--echo # Shrink right
@@ -117,31 +110,24 @@ delete from s where s = '2017-01-27' and e = '2017-01-30';
update t set e= '2017-01-29' where s = '2017-01-26' and x = 1;
--echo # Shrink both
---echo # Not a false-positive
--error ER_ROW_IS_REFERENCED_2
update t set s= '2017-01-27', e= '2017-01-28' where x = 2;
---echo # False-positive
---error ER_ROW_IS_REFERENCED_2
+
update t set s= '2017-01-28', e= '2017-01-29' where x = 2;
--echo # Expand both
---error ER_ROW_IS_REFERENCED_2
update t set s= '2017-01-20', e= '2017-02-05' where x = 2;
--echo # Move right
---echo # Not a false-positive
--error ER_ROW_IS_REFERENCED_2
update t set s= '2017-02-02', e= '2017-02-25' where x = 2;
---echo # False-positive
---error ER_ROW_IS_REFERENCED_2
+
update t set s= '2017-01-28', e= '2017-02-25' where x = 2;
--echo # Move left
---echo # Not a false-positive
--error ER_ROW_IS_REFERENCED_2
update t set s= '2017-01-20', e= '2017-01-27' where x = 2;
---echo # False-positive
---error ER_ROW_IS_REFERENCED_2
+
update t set s= '2017-01-20', e= '2017-01-29' where x = 2;
# TODO remove when deadlock issue will be solved
diff --git a/sql/field.h b/sql/field.h
index 4f81bd5f87d..953a7cc17df 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -1117,6 +1117,11 @@ public:
my_ptrdiff_t l_offset= (my_ptrdiff_t) (record - table->record[0]);
return ptr + l_offset;
}
+ uchar *ptr_in_record( uchar *record) const
+ {
+ my_ptrdiff_t l_offset= (my_ptrdiff_t) (record - table->record[0]);
+ return ptr + l_offset;
+ }
virtual int set_default();
bool has_update_default_function() const
diff --git a/sql/handler.cc b/sql/handler.cc
index d6194fa5fb1..67dbc22732f 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -6841,6 +6841,43 @@ void set_bits_with_key(MY_BITMAP *map, const KEY *key, uint key_parts)
bitmap_set_bit(map, key->key_part[i].fieldnr - 1);
}
+static int period_row_check_delete_for_key(const uchar *record,
+ const FOREIGN_KEY &fk)
+{
+ handler *foreign_handler= fk.foreign_key->table->file;
+
+ /* We shouldn't save cursor here, since this handler is never used.
+ The foreign table is only opened for FK matches. */
+ int error= foreign_handler->ha_index_init(fk.foreign_key_nr, false);
+ if(error)
+ return error;
+
+
+ set_bits_with_key(fk.foreign_key->table->read_set,
+ fk.foreign_key, fk.fields_num);
+ auto *record_buffer= foreign_handler->get_table()->record[0];
+ auto *key_buffer= foreign_handler->get_table()->record[1];
+
+ key_copy(key_buffer, record, fk.referenced_key, 0);
+ error= period_find_overlapping_record(foreign_handler,
+ key_buffer,
+ record,
+ record_buffer,
+ *fk.referenced_key,
+ *fk.foreign_key);
+
+ int end_error= foreign_handler->ha_index_end();
+
+ if (error == HA_ERR_KEY_NOT_FOUND) // no key matched
+ return end_error;
+ if (error)
+ return error;
+ if (end_error)
+ return end_error;
+
+ return HA_ERR_ROW_IS_REFERENCED;
+}
+
/**
@param record record to update from, or a deleted record */
int handler::period_row_del_fk_check(const uchar *record)
@@ -6852,47 +6889,17 @@ int handler::period_row_del_fk_check(const uchar *record)
for(int k= 0; k < table->referenced_keys; k++)
{
- auto &fk= table->referenced[k];
+ const auto &fk= table->referenced[k];
+
if (!fk.has_period)
continue;
DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts);
DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts);
- handler *foreign_handler= fk.foreign_key->table->file;
-
- /* We shouldn't save cursor here, since this handler is never used.
- The foreign table is only opened for FK matches. */
- int error= foreign_handler->ha_index_init(fk.foreign_key_nr, false);
- if(error)
- return error;
-
-
- set_bits_with_key(fk.foreign_key->table->read_set,
- fk.foreign_key, fk.fields_num);
- auto *record_buffer= foreign_handler->get_table()->record[0];
- auto *key_buffer= foreign_handler->get_table()->record[1];
-
- key_copy(key_buffer, record, fk.referenced_key, 0);
- error= period_find_overlapping_record(foreign_handler,
- key_buffer,
- record,
- record_buffer,
- *fk.referenced_key,
- *fk.foreign_key);
-
- int end_error= foreign_handler->ha_index_end();
-
- if (error == HA_ERR_KEY_NOT_FOUND) // no key matched
- return end_error;
- if (error)
- return error;
- if (end_error)
- return end_error;
- if (key_period_compare_periods(*fk.referenced_key,
- *fk.foreign_key,
- record, record_buffer) == 0)
- return HA_ERR_ROW_IS_REFERENCED;
+ int res= period_row_check_delete_for_key(record, fk);
+ if(res)
+ return res;
}
return 0;
}
@@ -6993,6 +7000,33 @@ static int period_check_row_references(handler *ref_handler,
return 0;
}
+static int period_row_check_insert_for_key(const uchar *record,
+ const FOREIGN_KEY &fk)
+{
+ handler *ref_handler= fk.referenced_key->table->file;
+
+ int error= ref_handler->ha_index_init(fk.referenced_key_nr, false);
+ if(error)
+ return error;
+
+ set_bits_with_key(fk.referenced_key->table->read_set,
+ fk.referenced_key, fk.fields_num);
+
+ auto *ref_record= ref_handler->get_table()->record[0];
+ auto *key_buffer= ref_handler->get_table()->record[1];
+
+ key_copy(key_buffer, record, fk.foreign_key, 0);
+ bool row_references;
+ error= period_check_row_references(ref_handler, key_buffer,
+ record, ref_record,
+ *fk.foreign_key, *fk.referenced_key);
+ if (error == HA_ERR_KEY_NOT_FOUND)
+ error= HA_ERR_NO_REFERENCED_ROW;
+
+ int end_error= ref_handler->ha_index_end();
+
+ return error ? error : end_error;
+}
int handler::period_row_ins_fk_check(const uchar *record)
{
@@ -7001,7 +7035,7 @@ int handler::period_row_ins_fk_check(const uchar *record)
for(int k= 0; k < table->foreign_keys; k++)
{
- auto &fk= table->foreign[k];
+ const auto &fk= table->foreign[k];
if (!fk.has_period)
continue;
if (table->versioned() && !table->vers_end_field()->is_max(record))
@@ -7010,43 +7044,143 @@ int handler::period_row_ins_fk_check(const uchar *record)
DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts);
DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts);
- handler *ref_handler= fk.referenced_key->table->file;
+ int res= period_row_check_insert_for_key(record, fk);
+ if (res)
+ return res;
+ }
+ return 0;
+}
- int error= ref_handler->ha_index_init(fk.referenced_key_nr, false);
- if(error)
- return error;
- set_bits_with_key(fk.referenced_key->table->read_set,
- fk.referenced_key, fk.fields_num);
+int handler::period_row_upd_fk_check(const uchar *old_data, const uchar *new_data)
+{
+ if (!table->s->period.name)
+ return 0;
+
+ bool is_versioned_delete= table->versioned() &&
+ !table->vers_end_field()->is_max(new_data);
+ bool is_history_update= table->versioned() &&
+ !table->vers_end_field()->is_max(old_data);
+
+ /*
+ 1. If base key parts changed, or old and new periods do not overlap,
+ emit delete + insert checks.
+ 2. If period key part is updated then there can be three operations:
+ grow, shrink or move.
+ 1. Move is enlarge + shrink.
+ 2. Each operation can be left or right.
+ 3. Move right is (enlarge right + shrink left) and vice-versa.
+ 4. Enlarge emits insert check on enlarged area.
+ Shrink emits delete check on shrinked area.
+ */
+ if (!check_overlaps_buffer)
+ check_overlaps_buffer= (uchar*)alloc_root(&table_share->mem_root,
+ table_share->max_unique_length
+ + table_share->reclength);
+ auto *start_field= table->field[table->s->period.start_fieldno];
+ auto *end_field= table->field[table->s->period.end_fieldno];
+ auto field_len= start_field->pack_length();
+
+ auto record_dup= check_overlaps_buffer;
+ // note that these two store pointers to fields in record_dup
+ Binary_value dup_start(start_field->ptr_in_record(record_dup), field_len);
+ Binary_value dup_end(end_field->ptr_in_record(record_dup), field_len);
+
+ uchar storage[4][Type_handler_datetime::hires_bytes(MAX_DATETIME_PRECISION)];
+ Binary_value old_start(storage[0], field_len);
+ Binary_value old_end(storage[1], field_len);
+ Binary_value new_start(storage[2], field_len);
+ Binary_value new_end(storage[3], field_len);
+ old_start.fill(start_field->ptr_in_record(old_data));
+ old_end.fill(end_field->ptr_in_record(old_data));
+ new_start.fill(start_field->ptr_in_record(new_data));
+ new_end.fill(end_field->ptr_in_record(new_data));
+
+ // we will use record_dup instead of old_data to modify period values
+ memcpy(record_dup, old_data, table->s->reclength);
+
+ for(int k= 0; !is_history_update && k < table->referenced_keys; k++)
+ {
+ const auto &fk= table->referenced[k];
+ if (!fk.has_period)
+ continue;
+
+ DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts);
+ DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts);
- auto *ref_record= ref_handler->get_table()->record[0];
- auto *key_buffer= ref_handler->get_table()->record[1];
+ const auto &key= *fk.referenced_key;
+ DBUG_ASSERT(key.table->alias.eq(&table->alias, table->alias.charset()));
- key_copy(key_buffer, record, fk.foreign_key, 0);
- bool row_references;
- error= period_check_row_references(ref_handler, key_buffer,
- record, ref_record,
- *fk.foreign_key, *fk.referenced_key);
+ int error= 0;
+ if (TABLE::check_period_overlaps(key, key, old_data, new_data) != 0)
+ {
+ error= period_row_check_delete_for_key(old_data, fk);
+ if (error)
+ return error;
+ continue;
+ }
- int end_error= ref_handler->ha_index_end();
+ if (old_start < new_start)
+ {
+ dup_end.fill(new_start);
+ error= period_row_check_delete_for_key(record_dup, fk);
+ if (error)
+ return error;
+ dup_end.fill(old_end);
+ }
- if (error == HA_ERR_KEY_NOT_FOUND)
- return HA_ERR_NO_REFERENCED_ROW;
- else if (error)
- return error;
- else if (end_error)
- return end_error;
+ if (old_end > new_end)
+ {
+ dup_start.fill(new_end);
+ error= period_row_check_delete_for_key(record_dup, fk);
+ if (error)
+ return error;
+ dup_start.fill(old_start);
+ }
}
- return 0;
-}
+ memcpy(record_dup, new_data, table->s->reclength);
-int handler::period_row_upd_fk_check(const uchar *old_data, const uchar *new_data)
-{
- int error= period_row_del_fk_check(old_data);
- if (!error)
- error= period_row_ins_fk_check(new_data);
- return error;
+ for(int k= 0; !is_versioned_delete && k < table->foreign_keys; k++)
+ {
+ const auto &fk= table->foreign[k];
+ if (!fk.has_period)
+ continue;
+
+ DBUG_ASSERT(fk.fields_num == fk.foreign_key->user_defined_key_parts);
+ DBUG_ASSERT(fk.fields_num == fk.referenced_key->user_defined_key_parts);
+
+ const auto &key= *fk.foreign_key;
+ DBUG_ASSERT(key.table->alias.eq(&table->alias, table->alias.charset()));
+
+ int error= 0;
+ if (TABLE::check_period_overlaps(key, key, old_data, new_data) != 0)
+ {
+ error= period_row_check_insert_for_key(new_data, fk);
+ if (error)
+ return error;
+ continue;
+ }
+
+ if (old_start > new_start)
+ {
+ dup_end.fill(old_start);
+ error= period_row_check_insert_for_key(record_dup, fk);
+ if (error)
+ return error;
+ dup_end.fill(new_end);
+ }
+
+ if (old_end < new_end)
+ {
+ dup_start.fill(old_end);
+ error= period_row_check_insert_for_key(record_dup, fk);
+ if (error)
+ return error;
+ dup_start.fill(new_start);
+ }
+ }
+ return 0;
}
int handler::ha_delete_row(const uchar *buf)