diff options
Diffstat (limited to 'storage/rocksdb/rdb_datadic.cc')
-rw-r--r-- | storage/rocksdb/rdb_datadic.cc | 386 |
1 files changed, 328 insertions, 58 deletions
diff --git a/storage/rocksdb/rdb_datadic.cc b/storage/rocksdb/rdb_datadic.cc index 618771c5410..22d117f74ad 100644 --- a/storage/rocksdb/rdb_datadic.cc +++ b/storage/rocksdb/rdb_datadic.cc @@ -36,9 +36,12 @@ #include <vector> /* MySQL header files */ +#include "./field.h" #include "./key.h" #include "./m_ctype.h" #include "./my_bit.h" +#include "./my_bitmap.h" +#include "./sql_table.h" /* MyRocks header files */ #include "./ha_rocksdb_proto.h" @@ -53,20 +56,16 @@ void get_mem_comparable_space(const CHARSET_INFO *cs, size_t *mb_len); /* - MariaDB's stand-in for Field::check_field_name_match that facebook/mysql-5.6 - uses. - - They have that function because of their JSON support, where "a.b.c" is a - sub-field of "a.b". + MariaDB's replacement for FB/MySQL Field::check_field_name_match : */ - -static bool check_field_name_match(Field *field, const char *name) +inline bool field_check_field_name_match(Field *field, const char *name) { return (0 == my_strcasecmp(system_charset_info, field->field_name.str, name)); } + /* Rdb_key_def class implementation */ @@ -92,6 +91,14 @@ Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg, { mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST); rdb_netbuf_store_index(m_index_number_storage_form, m_index_number); + m_total_index_flags_length = + calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG); + DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY && + m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2, + m_total_index_flags_length == 0); + DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY && + m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2, + m_total_index_flags_length == 0); DBUG_ASSERT(m_cf_handle != nullptr); } @@ -109,6 +116,14 @@ Rdb_key_def::Rdb_key_def(const Rdb_key_def &k) m_maxlength(k.m_maxlength) { mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST); rdb_netbuf_store_index(m_index_number_storage_form, m_index_number); + m_total_index_flags_length = + calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG); + DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY && + m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2, + m_total_index_flags_length == 0); + DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY && + m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2, + m_total_index_flags_length == 0); if (k.m_pack_info) { const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts; m_pack_info = @@ -285,7 +300,7 @@ void Rdb_key_def::setup(const TABLE *const tbl, the offset of the TTL key part here. */ if (!m_ttl_column.empty() && - check_field_name_match(field, m_ttl_column.c_str())) { + field_check_field_name_match(field, m_ttl_column.c_str())) { DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG); DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG); DBUG_ASSERT(!field->real_maybe_null()); @@ -405,7 +420,7 @@ uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg, if (skip_checks) { for (uint i = 0; i < table_arg->s->fields; i++) { Field *const field = table_arg->field[i]; - if (check_field_name_match(field, ttl_col_str.c_str())) { + if (field_check_field_name_match(field, ttl_col_str.c_str())) { *ttl_column = ttl_col_str; *ttl_field_offset = i; } @@ -418,7 +433,7 @@ uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg, bool found = false; for (uint i = 0; i < table_arg->s->fields; i++) { Field *const field = table_arg->field[i]; - if (check_field_name_match(field, ttl_col_str.c_str()) && + if (field_check_field_name_match(field, ttl_col_str.c_str()) && field->real_type() == MYSQL_TYPE_LONGLONG && field->key_type() == HA_KEYTYPE_ULONGLONG && !field->real_maybe_null()) { @@ -774,11 +789,14 @@ uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer, */ bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) { - const uchar *ptr = (const uchar *)unpack_info.data(); size_t size = unpack_info.size(); + if (size == 0) { + return false; + } + const uchar *ptr = (const uchar *)unpack_info.data(); // Skip unpack info if present. - if (size >= RDB_UNPACK_HEADER_SIZE && ptr[0] == RDB_UNPACK_DATA_TAG) { + if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) { const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1); SHIP_ASSERT(size >= skip_len); @@ -808,6 +826,120 @@ int Rdb_key_def::successor(uchar *const packed_tuple, const uint &len) { return changed; } +static const std::map<char, size_t> UNPACK_HEADER_SIZES = { + {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE}, + {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}}; + +/* + @return The length in bytes of the header specified by the given tag +*/ +size_t Rdb_key_def::get_unpack_header_size(char tag) { + DBUG_ASSERT(is_unpack_data_tag(tag)); + return UNPACK_HEADER_SIZES.at(tag); +} + +/* + Get a bitmap indicating which varchar columns must be covered for this + lookup to be covered. If the bitmap is a subset of the covered bitmap, then + the lookup is covered. If it can already be determined that the lookup is + not covered, map->bitmap will be set to null. + */ +void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const { + DBUG_ASSERT(map->bitmap == nullptr); + bitmap_init(map, nullptr, MAX_REF_PARTS, false); + uint curr_bitmap_pos = 0; + + // Indicates which columns in the read set might be covered. + MY_BITMAP maybe_covered_bitmap; + bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false); + + for (uint i = 0; i < m_key_parts; i++) { + if (table_has_hidden_pk(table) && i + 1 == m_key_parts) { + continue; + } + + Field *const field = m_pack_info[i].get_field_in_table(table); + + // Columns which are always covered are not stored in the covered bitmap so + // we can ignore them here too. + if (m_pack_info[i].m_covered && + bitmap_is_set(table->read_set, field->field_index)) { + bitmap_set_bit(&maybe_covered_bitmap, field->field_index); + continue; + } + + switch (field->real_type()) { + // This type may be covered depending on the record. If it was requested, + // we require the covered bitmap to have this bit set. + case MYSQL_TYPE_VARCHAR: + if (curr_bitmap_pos < MAX_REF_PARTS) { + if (bitmap_is_set(table->read_set, field->field_index)) { + bitmap_set_bit(map, curr_bitmap_pos); + bitmap_set_bit(&maybe_covered_bitmap, field->field_index); + } + curr_bitmap_pos++; + } else { + bitmap_free(&maybe_covered_bitmap); + bitmap_free(map); + return; + } + break; + // This column is a type which is never covered. If it was requested, we + // know this lookup will never be covered. + default: + if (bitmap_is_set(table->read_set, field->field_index)) { + bitmap_free(&maybe_covered_bitmap); + bitmap_free(map); + return; + } + break; + } + } + + // If there are columns which are not covered in the read set, the lookup + // can't be covered. + if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) { + bitmap_free(map); + } + bitmap_free(&maybe_covered_bitmap); +} + +/* + Return true if for this secondary index + - All of the requested columns are in the index + - All values for columns that are prefix-only indexes are shorter or equal + in length to the prefix + */ +bool Rdb_key_def::covers_lookup(TABLE *const table, + const rocksdb::Slice *const unpack_info, + const MY_BITMAP *const lookup_bitmap) const { + DBUG_ASSERT(lookup_bitmap != nullptr); + if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) { + return false; + } + + Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info); + + // Check if this unpack_info has a covered_bitmap + const char *unpack_header = unp_reader.get_current_ptr(); + const bool has_covered_unpack_info = + unp_reader.remaining_bytes() && + unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG; + if (!has_covered_unpack_info || + !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) { + return false; + } + + MY_BITMAP covered_bitmap; + my_bitmap_map covered_bits; + bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false); + covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header + + sizeof(RDB_UNPACK_COVERED_DATA_TAG) + + RDB_UNPACK_COVERED_DATA_LEN_SIZE); + + return bitmap_is_subset(lookup_bitmap, &covered_bitmap); +} + uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info, uchar *tuple, uchar *const packed_tuple, uchar *const pack_buffer, @@ -872,14 +1004,12 @@ uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info, Length of the packed tuple */ -uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, - const uchar *const record, - uchar *const packed_tuple, - Rdb_string_writer *const unpack_info, - const bool &should_store_row_debug_checksums, - const longlong &hidden_pk_id, uint n_key_parts, - uint *const n_null_fields, - uint *const ttl_pk_offset) const { +uint Rdb_key_def::pack_record( + const TABLE *const tbl, uchar *const pack_buffer, const uchar *const record, + uchar *const packed_tuple, Rdb_string_writer *const unpack_info, + const bool &should_store_row_debug_checksums, const longlong &hidden_pk_id, + uint n_key_parts, uint *const n_null_fields, uint *const ttl_pk_offset, + const char *const ttl_bytes) const { DBUG_ASSERT(tbl != nullptr); DBUG_ASSERT(pack_buffer != nullptr); DBUG_ASSERT(record != nullptr); @@ -890,7 +1020,9 @@ uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, (m_index_type == INDEX_TYPE_SECONDARY)); uchar *tuple = packed_tuple; + size_t unpack_start_pos = size_t(-1); size_t unpack_len_pos = size_t(-1); + size_t covered_bitmap_pos = size_t(-1); const bool hidden_pk_exists = table_has_hidden_pk(tbl); rdb_netbuf_store_index(tuple, m_index_number); @@ -912,14 +1044,57 @@ uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, if (n_null_fields) *n_null_fields = 0; + // Check if we need a covered bitmap. If it is certain that all key parts are + // covering, we don't need one. + bool store_covered_bitmap = false; + if (unpack_info && use_covered_bitmap_format()) { + for (uint i = 0; i < n_key_parts; i++) { + if (!m_pack_info[i].m_covered) { + store_covered_bitmap = true; + break; + } + } + } + + const char tag = + store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG; + if (unpack_info) { unpack_info->clear(); - unpack_info->write_uint8(RDB_UNPACK_DATA_TAG); + + if (m_index_type == INDEX_TYPE_SECONDARY && + m_total_index_flags_length > 0) { + // Reserve space for index flag fields + unpack_info->allocate(m_total_index_flags_length); + + // Insert TTL timestamp + if (has_ttl() && ttl_bytes) { + write_index_flag_field(unpack_info, + reinterpret_cast<const uchar *const>(ttl_bytes), + Rdb_key_def::TTL_FLAG); + } + } + + unpack_start_pos = unpack_info->get_current_pos(); + unpack_info->write_uint8(tag); unpack_len_pos = unpack_info->get_current_pos(); // we don't know the total length yet, so write a zero unpack_info->write_uint16(0); + + if (store_covered_bitmap) { + // Reserve two bytes for the covered bitmap. This will store, for key + // parts which are not always covering, whether or not it is covering + // for this record. + covered_bitmap_pos = unpack_info->get_current_pos(); + unpack_info->write_uint16(0); + } } + MY_BITMAP covered_bitmap; + my_bitmap_map covered_bits; + uint curr_bitmap_pos = 0; + bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false); + for (uint i = 0; i < n_key_parts; i++) { // Fill hidden pk id into the last key part for secondary keys for tables // with no pk @@ -938,7 +1113,7 @@ uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, // Save the ttl duration offset in the key so we can store it in front of // the record later. if (ttl_pk_offset && m_ttl_duration > 0 && i == m_ttl_pk_key_part_offset) { - DBUG_ASSERT(check_field_name_match(field, m_ttl_column.c_str())); + DBUG_ASSERT(field_check_field_name_match(field, m_ttl_column.c_str())); DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG); DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG); DBUG_ASSERT(!field->real_maybe_null()); @@ -953,6 +1128,25 @@ uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer, unpack_info, n_null_fields); + // If this key part is a prefix of a VARCHAR field, check if it's covered. + if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR && + !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) { + size_t data_length = field->data_length(); + uint16 key_length; + if (m_pk_part_no[i] == (uint)-1) { + key_length = tbl->key_info[get_keyno()].key_part[i].length; + } else { + key_length = + tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length; + } + + if (m_pack_info[i].m_unpack_func != nullptr && + data_length <= key_length) { + bitmap_set_bit(&covered_bitmap, curr_bitmap_pos); + } + curr_bitmap_pos++; + } + // Restore field->ptr and field->null_ptr field->move_field(tbl->record[0] + field_offset, maybe_null ? tbl->record[0] + null_offset : nullptr, @@ -960,7 +1154,7 @@ uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, } if (unpack_info) { - const size_t len = unpack_info->get_current_pos(); + const size_t len = unpack_info->get_current_pos() - unpack_start_pos; DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max()); // Don't store the unpack_info if it has only the header (that is, there's @@ -968,9 +1162,12 @@ uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, // Primary Keys are special: for them, store the unpack_info even if it's // empty (provided m_maybe_unpack_info==true, see // ha_rocksdb::convert_record_to_storage_format) - if (len == RDB_UNPACK_HEADER_SIZE && - m_index_type != Rdb_key_def::INDEX_TYPE_PRIMARY) { - unpack_info->clear(); + if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) { + if (len == get_unpack_header_size(tag) && !covered_bits) { + unpack_info->truncate(unpack_start_pos); + } else if (store_covered_bitmap) { + unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits); + } } else { unpack_info->write_uint16_at(unpack_len_pos, len); } @@ -1216,11 +1413,30 @@ int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf, // For secondary keys, we expect the value field to contain unpack data and // checksum data in that order. One or both can be missing, but they cannot // be reordered. + const char *unpack_header = unp_reader.get_current_ptr(); const bool has_unpack_info = - unp_reader.remaining_bytes() && - *unp_reader.get_current_ptr() == RDB_UNPACK_DATA_TAG; - if (has_unpack_info && !unp_reader.read(RDB_UNPACK_HEADER_SIZE)) { - return HA_ERR_ROCKSDB_CORRUPT_DATA; + unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]); + if (has_unpack_info) { + if ((m_index_type == INDEX_TYPE_SECONDARY && + m_total_index_flags_length > 0 && + !unp_reader.read(m_total_index_flags_length)) || + !unp_reader.read(get_unpack_header_size(unpack_header[0]))) { + return HA_ERR_ROCKSDB_CORRUPT_DATA; + } + } + + // Read the covered bitmap + MY_BITMAP covered_bitmap; + my_bitmap_map covered_bits; + uint curr_bitmap_pos = 0; + bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false); + + const bool has_covered_bitmap = + has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG); + if (has_covered_bitmap) { + covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header + + sizeof(RDB_UNPACK_COVERED_DATA_TAG) + + RDB_UNPACK_COVERED_DATA_LEN_SIZE); } for (uint i = 0; i < m_key_parts; i++) { @@ -1241,7 +1457,13 @@ int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf, Field *const field = fpi->get_field_in_table(table); - if (fpi->m_unpack_func) { + bool covered_column = true; + if (has_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR && + !m_pack_info[i].m_covered) { + covered_column = curr_bitmap_pos < MAX_REF_PARTS && + bitmap_is_set(&covered_bitmap, curr_bitmap_pos++); + } + if (fpi->m_unpack_func && covered_column) { /* It is possible to unpack this column. Do it. */ uint field_offset = field->ptr - table->record[0]; @@ -2835,6 +3057,8 @@ bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr, m_skip_func = &Rdb_key_def::skip_max_length; m_pack_func = &Rdb_key_def::pack_with_make_sort_key; + m_covered = false; + switch (type) { case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_LONG: @@ -2842,14 +3066,17 @@ bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr, case MYSQL_TYPE_SHORT: case MYSQL_TYPE_TINY: m_unpack_func = &Rdb_key_def::unpack_integer; + m_covered = true; return true; case MYSQL_TYPE_DOUBLE: m_unpack_func = &Rdb_key_def::unpack_double; + m_covered = true; return true; case MYSQL_TYPE_FLOAT: m_unpack_func = &Rdb_key_def::unpack_float; + m_covered = true; return true; case MYSQL_TYPE_NEWDECIMAL: @@ -2867,6 +3094,7 @@ bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr, case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */ /* Everything that comes here is packed with just a memcpy(). */ m_unpack_func = &Rdb_key_def::unpack_binary_str; + m_covered = true; return true; case MYSQL_TYPE_NEWDATE: @@ -2876,6 +3104,7 @@ bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr, and little-endian) */ m_unpack_func = &Rdb_key_def::unpack_newdate; + m_covered = true; return true; case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: @@ -3044,27 +3273,36 @@ bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr, } } - // Make an adjustment: unpacking partially covered columns is not - // possible. field->table is populated when called through - // Rdb_key_def::setup, but not during ha_rocksdb::index_flags. + // Make an adjustment: if this column is partially covered, tell the SQL + // layer we can't do index-only scans. Later when we perform an index read, + // we'll check on a record-by-record basis if we can do an index-only scan + // or not. + uint field_length; if (field->table) { - // Get the original Field object and compare lengths. If this key part is - // a prefix of a column, then we can't do index-only scans. - if (field->table->field[field->field_index]->field_length != key_length) { - m_unpack_func = nullptr; - m_make_unpack_info_func = nullptr; - m_unpack_info_stores_value = true; - res = false; - } + field_length = field->table->field[field->field_index]->field_length; } else { - if (field->field_length != key_length) { + field_length = field->field_length; + } + + if (field_length != key_length) { + res = false; + // If this index doesn't support covered bitmaps, then we won't know + // during a read if the column is actually covered or not. If so, we need + // to assume the column isn't covered and skip it during unpacking. + // + // If key_descr == NULL, then this is a dummy field and we probably don't + // need to perform this step. However, to preserve the behavior before + // this change, we'll only skip this step if we have an index which + // supports covered bitmaps. + if (!key_descr || !key_descr->use_covered_bitmap_format()) { m_unpack_func = nullptr; m_make_unpack_info_func = nullptr; m_unpack_info_stores_value = true; - res = false; } } } + + m_covered = res; return res; } @@ -3180,18 +3418,19 @@ bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict, // Length that each index flag takes inside the record. // Each index in the array maps to the enum INDEX_FLAG -static const std::array<int, 1> index_flag_lengths = { +static const std::array<uint, 1> index_flag_lengths = { {ROCKSDB_SIZEOF_TTL_RECORD}}; - bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) { return flag & index_flags; } uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags, - enum INDEX_FLAG flag) { + enum INDEX_FLAG flag, + uint *const length) { - DBUG_ASSERT(Rdb_key_def::has_index_flag(index_flags, flag)); + DBUG_ASSERT_IMP(flag != MAX_FLAG, + Rdb_key_def::has_index_flag(index_flags, flag)); uint offset = 0; for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) { @@ -3199,6 +3438,9 @@ uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags, /* Exit once we've reached the proper flag */ if (flag & mask) { + if (length != nullptr) { + *length = index_flag_lengths[bit]; + } break; } @@ -3210,6 +3452,15 @@ uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags, return offset; } +void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf, + const uchar *const val, + enum INDEX_FLAG flag) const { + uint len; + uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len); + DBUG_ASSERT(offset + len <= buf->get_current_pos()); + memcpy(buf->ptr() + offset, val, len); +} + void Rdb_tbl_def::check_if_is_mysql_system_table() { static const char *const system_dbs[] = { "mysql", "performance_schema", "information_schema", @@ -3308,10 +3559,12 @@ struct Rdb_validate_tbls : public Rdb_tables_scanner { int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) { DBUG_ASSERT(tdef != nullptr); - /* Add the database/table into the list */ - bool is_partition = tdef->base_partition().size() != 0; - m_list[tdef->base_dbname()].insert( - tbl_info_t(tdef->base_tablename(), is_partition)); + /* Add the database/table into the list that are not temp table */ + if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) { + bool is_partition = tdef->base_partition().size() != 0; + m_list[tdef->base_dbname()].insert( + tbl_info_t(tdef->base_tablename(), is_partition)); + } return HA_EXIT_SUCCESS; } @@ -3394,9 +3647,9 @@ bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir, /* Scan through the files in the directory */ struct fileinfo *file_info = dir_info->dir_entry; for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) { - /* Find .frm files that are not temp files (those that start with '#') */ + /* Find .frm files that are not temp files (those that contain '#sql') */ const char *ext = strrchr(file_info->name, '.'); - if (ext != nullptr && !is_prefix(file_info->name, tmp_file_prefix) && + if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr && strcmp(ext, ".frm") == 0) { std::string tablename = std::string(file_info->name, ext - file_info->name); @@ -3723,6 +3976,20 @@ Rdb_ddl_manager::find(GL_INDEX_ID gl_index_id) { return empty; } +// this method returns the name of the table based on an index id. It acquires +// a read lock on m_rwlock. +const std::string +Rdb_ddl_manager::safe_get_table_name(const GL_INDEX_ID &gl_index_id) { + std::string ret; + mysql_rwlock_rdlock(&m_rwlock); + auto it = m_index_num_to_keydef.find(gl_index_id); + if (it != m_index_num_to_keydef.end()) { + ret = it->second.first; + } + mysql_rwlock_unlock(&m_rwlock); + return ret; +} + void Rdb_ddl_manager::set_stats( const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) { mysql_rwlock_wrlock(&m_rwlock); @@ -3961,12 +4228,10 @@ void Rdb_binlog_manager::cleanup() {} write succeeded or not is not possible here. @param binlog_name Binlog name @param binlog_pos Binlog pos - @param binlog_gtid Binlog max GTID @param batch WriteBatch */ void Rdb_binlog_manager::update(const char *const binlog_name, const my_off_t binlog_pos, - const char *const binlog_max_gtid, rocksdb::WriteBatchBase *const batch) { if (binlog_name && binlog_pos) { // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024 @@ -3974,7 +4239,7 @@ void Rdb_binlog_manager::update(const char *const binlog_name, uchar value_buf[RDB_MAX_BINLOG_INFO_LEN]; m_dict->put_key( batch, m_key_slice, - pack_value(value_buf, binlog_name, binlog_pos, binlog_max_gtid)); + pack_value(value_buf, binlog_name, binlog_pos, NULL)); } } @@ -4009,7 +4274,6 @@ bool Rdb_binlog_manager::read(char *const binlog_name, @param buf Preallocated buffer to set binlog info. @param binlog_name Binlog name @param binlog_pos Binlog pos - @param binlog_gtid Binlog GTID @return rocksdb::Slice converted from buf and its length */ rocksdb::Slice @@ -4038,15 +4302,21 @@ Rdb_binlog_manager::pack_value(uchar *const buf, const char *const binlog_name, // store binlog gtid length. // If gtid was not set, store 0 instead +#ifdef MARIAROCKS_NOT_YET const uint16_t binlog_gtid_len = binlog_gtid ? (uint16_t)strlen(binlog_gtid) : 0; rdb_netbuf_store_uint16(buf + pack_len, binlog_gtid_len); +#endif pack_len += sizeof(uint16); + // MariaDB: + rdb_netbuf_store_uint16(buf + pack_len, 0); +#ifdef MARIAROCKS_NOT_YET if (binlog_gtid_len > 0) { // store binlog gtid memcpy(buf + pack_len, binlog_gtid, binlog_gtid_len); pack_len += binlog_gtid_len; } +#endif return rocksdb::Slice((char *)buf, pack_len); } |