diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 216 |
1 files changed, 99 insertions, 117 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index d5ae1c954ff..f7c6d09f98f 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2465,95 +2465,62 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, else time_zone_len= 0; + LEX *lex= thd->lex; /* - In what follows, we decide whether to write to the binary log or to use a - cache. + TRUE defines that either a trx-cache or stmt-cache must be used + and wrapped by a BEGIN...COMMIT. Otherwise, the statement will + be written directly to the binary log without being wrapped by + a BEGIN...COMMIT. + + Note that a cache will not be used if the parameter direct is + TRUE. */ - LEX *lex= thd->lex; - bool implicit_commit= FALSE; - bool force_trans= FALSE; + bool use_cache= FALSE; + /* + TRUE defines that the trx-cache must be used and by consequence + the use_cache is TRUE. + + Note that a cache will not be used if the parameter direct is + TRUE. + */ + bool trx_cache= FALSE; cache_type= Log_event::EVENT_INVALID_CACHE; + switch (lex->sql_command) { - case SQLCOM_ALTER_DB: - case SQLCOM_CREATE_FUNCTION: - case SQLCOM_DROP_FUNCTION: - case SQLCOM_DROP_PROCEDURE: - case SQLCOM_INSTALL_PLUGIN: - case SQLCOM_UNINSTALL_PLUGIN: - case SQLCOM_ALTER_TABLESPACE: - implicit_commit= TRUE; - break; case SQLCOM_DROP_TABLE: - force_trans= lex->drop_temporary && thd->in_multi_stmt_transaction_mode(); - implicit_commit= !force_trans; - break; - case SQLCOM_ALTER_TABLE: + use_cache= trx_cache= (lex->drop_temporary && + thd->in_multi_stmt_transaction_mode()); + break; + case SQLCOM_CREATE_TABLE: - force_trans= (lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && - thd->in_multi_stmt_transaction_mode(); - implicit_commit= !force_trans && - !(lex->select_lex.item_list.elements && - thd->is_current_stmt_binlog_format_row()); + use_cache= trx_cache= + ((lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) && + thd->in_multi_stmt_transaction_mode()) || + (lex->select_lex.item_list.elements && + thd->is_current_stmt_binlog_format_row()); break; case SQLCOM_SET_OPTION: - implicit_commit= (lex->autocommit ? TRUE : FALSE); + use_cache= trx_cache= (lex->autocommit ? FALSE : TRUE); break; - /* - Replace what follows after CF_AUTO_COMMIT_TRANS is backported by: - - default: - implicit_commit= ((sql_command_flags[lex->sql_command] & - CF_AUTO_COMMIT_TRANS)); - break; - */ - case SQLCOM_CREATE_INDEX: - case SQLCOM_TRUNCATE: - case SQLCOM_CREATE_DB: - case SQLCOM_DROP_DB: - case SQLCOM_ALTER_DB_UPGRADE: - case SQLCOM_RENAME_TABLE: - case SQLCOM_DROP_INDEX: - case SQLCOM_CREATE_VIEW: - case SQLCOM_DROP_VIEW: - case SQLCOM_CREATE_TRIGGER: - case SQLCOM_DROP_TRIGGER: - case SQLCOM_CREATE_EVENT: - case SQLCOM_ALTER_EVENT: - case SQLCOM_DROP_EVENT: - case SQLCOM_REPAIR: - case SQLCOM_OPTIMIZE: - case SQLCOM_ANALYZE: - case SQLCOM_CREATE_USER: - case SQLCOM_DROP_USER: - case SQLCOM_RENAME_USER: - case SQLCOM_REVOKE_ALL: - case SQLCOM_REVOKE: - case SQLCOM_GRANT: - case SQLCOM_CREATE_PROCEDURE: - case SQLCOM_CREATE_SPFUNCTION: - case SQLCOM_ALTER_PROCEDURE: - case SQLCOM_ALTER_FUNCTION: - case SQLCOM_ASSIGN_TO_KEYCACHE: - case SQLCOM_PRELOAD_KEYS: - case SQLCOM_FLUSH: - case SQLCOM_RESET: - case SQLCOM_CHECK: - implicit_commit= TRUE; + case SQLCOM_RELEASE_SAVEPOINT: + case SQLCOM_ROLLBACK_TO_SAVEPOINT: + case SQLCOM_SAVEPOINT: + use_cache= trx_cache= TRUE; break; default: - implicit_commit= FALSE; + use_cache= sqlcom_can_generate_row_events(thd); break; } - if (implicit_commit || direct) + if (!use_cache || direct) { cache_type= Log_event::EVENT_NO_CACHE; } else { - cache_type= ((using_trans || stmt_has_updated_trans_table(thd) || - force_trans || thd->thread_temporary_used) + cache_type= ((using_trans || stmt_has_updated_trans_table(thd) + || trx_cache || thd->thread_temporary_used) ? Log_event::EVENT_TRANSACTIONAL_CACHE : Log_event::EVENT_STMT_CACHE); } @@ -3189,10 +3156,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, ::do_apply_event(), then the companion SET also have so we don't need to reset_one_shot_variables(). */ - if (!strncmp(query_arg, "BEGIN", q_len_arg) || - !strncmp(query_arg, "COMMIT", q_len_arg) || - !strncmp(query_arg, "ROLLBACK", q_len_arg) || - rpl_filter->db_ok(thd->db)) + if (is_trans_keyword() || rpl_filter->db_ok(thd->db)) { thd->set_time((time_t)when); thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id()); @@ -3354,11 +3318,12 @@ compare_errors: */ actual_error= thd->is_error() ? thd->stmt_da->sql_errno() : 0; DBUG_PRINT("info",("expected_error: %d sql_errno: %d", - expected_error, actual_error)); + expected_error, actual_error)); + if ((expected_error && expected_error != actual_error && !concurrency_error_code(expected_error)) && - !ignored_error_code(actual_error) && - !ignored_error_code(expected_error)) + !ignored_error_code(actual_error) && + !ignored_error_code(expected_error)) { rli->report(ERROR_LEVEL, 0, "\ @@ -3376,9 +3341,9 @@ Default database: '%s'. Query: '%s'", If we get the same error code as expected and it is not a concurrency issue, or should be ignored. */ - else if ((expected_error == actual_error && + else if ((expected_error == actual_error && !concurrency_error_code(expected_error)) || - ignored_error_code(actual_error)) + ignored_error_code(actual_error)) { DBUG_PRINT("info",("error ignored")); clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); @@ -3397,7 +3362,7 @@ Default database: '%s'. Query: '%s'", If we expected a non-zero error code and get nothing and, it is a concurrency issue or should be ignored. */ - else if (expected_error && !actual_error && + else if (expected_error && !actual_error && (concurrency_error_code(expected_error) || ignored_error_code(expected_error))) trans_rollback_stmt(thd); @@ -3438,12 +3403,13 @@ Default database: '%s'. Query: '%s'", */ } /* End of if (db_ok(... */ - {/** - The following failure injecion works in cooperation with tests + { + /** + The following failure injecion works in cooperation with tests setting @@global.debug= 'd,stop_slave_middle_group'. - The sql thread receives the killed status and will proceed + The sql thread receives the killed status and will proceed to shutdown trying to finish incomplete events group. - */ + */ DBUG_EXECUTE_IF("stop_slave_middle_group", if (strcmp("COMMIT", query) != 0 && strcmp("BEGIN", query) != 0) @@ -3458,7 +3424,7 @@ end: Probably we have set thd->query, thd->db, thd->catalog to point to places in the data_buf of this event. Now the event is going to be deleted probably, so data_buf will be freed, so the thd->... listed above will be - pointers to freed memory. + pointers to freed memory. So we must set them to 0, so that those bad pointers values are not later used. Note that "cleanup" queries like automatic DROP TEMPORARY TABLE don't suffer from these assignments to 0 as DROP TEMPORARY @@ -3468,7 +3434,7 @@ end: thd->set_db(NULL, 0); /* will free the current database */ thd->set_query(NULL, 0); DBUG_PRINT("info", ("end: query= 0")); - close_thread_tables(thd); + close_thread_tables(thd); /* As a disk space optimization, future masters will not log an event for LAST_INSERT_ID() if that function returned 0 (and thus they will be able @@ -3770,7 +3736,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) */ if (post_header_len) { -#ifndef DBUG_OFF +#ifndef DBUG_OFF // Allows us to sanity-check that all events initialized their // events (see the end of this 'if' block). memset(post_header_len, 255, number_of_event_types*sizeof(uint8)); @@ -4624,9 +4590,9 @@ void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info, for (i = 0; i < num_fields; i++) { if (i) - my_b_printf(&cache, ","); + my_b_printf(&cache, ","); my_b_printf(&cache, "%s", field); - + field += field_lens[i] + 1; } my_b_printf(&cache, ")"); @@ -4796,9 +4762,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, thd->set_query(load_data_query, (uint) (end - load_data_query)); if (sql_ex.opt_flags & REPLACE_FLAG) - { - handle_dup= DUP_REPLACE; - } + handle_dup= DUP_REPLACE; else if (sql_ex.opt_flags & IGNORE_FLAG) { ignore= 1; @@ -4807,14 +4771,14 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, else { /* - When replication is running fine, if it was DUP_ERROR on the + When replication is running fine, if it was DUP_ERROR on the master then we could choose IGNORE here, because if DUP_ERROR suceeded on master, and data is identical on the master and slave, then there should be no uniqueness errors on slave, so IGNORE is the same as DUP_ERROR. But in the unlikely case of uniqueness errors (because the data on the master and slave happen to be different - (user error or bug), we want LOAD DATA to print an error message on - the slave to discover the problem. + (user error or bug), we want LOAD DATA to print an error message on + the slave to discover the problem. If reading from net (a 3.23 master), mysql_load() will change this to IGNORE. @@ -4846,7 +4810,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); if (sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); + ex.field_term->length(0); ex.skip_lines = skip_lines; List<Item> field_list; @@ -4855,12 +4819,10 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, thd->variables.pseudo_thread_id= thread_id; if (net) { - // mysql_load will use thd->net to read the file - thd->net.vio = net->vio; - /* - Make sure the client does not get confused about the packet sequence - */ - thd->net.pkt_nr = net->pkt_nr; + // mysql_load will use thd->net to read the file + thd->net.vio = net->vio; + // Make sure the client does not get confused about the packet sequence + thd->net.pkt_nr = net->pkt_nr; } /* It is safe to use tmp_list twice because we are not going to @@ -4872,7 +4834,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, thd->is_slave_error= 1; if (thd->cuted_fields) { - /* log_pos is the position of the LOAD event in the master log */ + /* log_pos is the position of the LOAD event in the master log */ sql_print_warning("Slave: load data infile on table '%s' at " "log position %s in log '%s' produced %ld " "warning(s). Default database: '%s'", @@ -5620,10 +5582,10 @@ User_var_log_event(const char* buf, { type= (Item_result) buf[UV_VAL_IS_NULL]; charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE); - val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + - UV_CHARSET_NUMBER_SIZE); + val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + + UV_CHARSET_NUMBER_SIZE); val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + - UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE); + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE); /** We need to check if this is from an old server @@ -5707,9 +5669,9 @@ bool User_var_log_event::write(IO_CACHE* file) return (write_header(file, event_length) || my_b_safe_write(file, (uchar*) buf, sizeof(buf)) || - my_b_safe_write(file, (uchar*) name, name_len) || - my_b_safe_write(file, (uchar*) buf1, buf1_length) || - my_b_safe_write(file, pos, val_len) || + my_b_safe_write(file, (uchar*) name, name_len) || + my_b_safe_write(file, (uchar*) buf1, buf1_length) || + my_b_safe_write(file, pos, val_len) || my_b_safe_write(file, &flags, unsigned_len)); } #endif @@ -5983,7 +5945,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg, master_log_len = strlen(rli->group_master_log_name); // on OOM, just do not initialize the structure and print the error if ((mem_pool = (char*)my_malloc(get_data_size() + 1, - MYF(MY_WME)))) + MYF(MY_WME)))) { master_host = mem_pool + SL_MASTER_HOST_OFFSET ; memcpy(master_host, mi->host, master_host_len + 1); @@ -5992,7 +5954,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg, master_port = mi->port; master_pos = rli->group_master_log_pos; DBUG_PRINT("info", ("master_log: %s pos: %lu", master_log, - (ulong) master_pos)); + (ulong) master_pos)); } else sql_print_error("Out of memory while recording slave event"); @@ -8941,11 +8903,28 @@ static bool record_compare(TABLE *table) { for (int i = 0 ; i < 2 ; ++i) { - saved_x[i]= table->record[i][0]; - saved_filler[i]= table->record[i][table->s->null_bytes - 1]; - table->record[i][0]|= 1U; - table->record[i][table->s->null_bytes - 1]|= - 256U - (1U << table->s->last_null_bit_pos); + /* + If we have an X bit then we need to take care of it. + */ + if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD)) + { + saved_x[i]= table->record[i][0]; + table->record[i][0]|= 1U; + } + + /* + If (last_null_bit_pos == 0 && null_bytes > 1), then: + + X bit (if any) + N nullable fields + M Field_bit fields = 8 bits + + Ie, the entire byte is used. + */ + if (table->s->last_null_bit_pos > 0) + { + saved_filler[i]= table->record[i][table->s->null_bytes - 1]; + table->record[i][table->s->null_bytes - 1]|= + 256U - (1U << table->s->last_null_bit_pos); + } } } @@ -8985,8 +8964,11 @@ record_compare_exit: { for (int i = 0 ; i < 2 ; ++i) { - table->record[i][0]= saved_x[i]; - table->record[i][table->s->null_bytes - 1]= saved_filler[i]; + if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD)) + table->record[i][0]= saved_x[i]; + + if (table->s->last_null_bit_pos) + table->record[i][table->s->null_bytes - 1]= saved_filler[i]; } } |