summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mariadb.org>2022-05-29 13:31:20 +0200
committerNikita Malyavin <nikitamalyavin@gmail.com>2022-10-17 15:24:44 +0300
commit8ac16fa5ef70fdf023699e01d6787791bb53a3fa (patch)
treec74d578aaa6d3e31dcf80beb5c6fd7072aef219d
parentb8331bb1fecc05e42e428a974b3b73eb79985426 (diff)
downloadmariadb-git-8ac16fa5ef70fdf023699e01d6787791bb53a3fa.tar.gz
don't copy stmt IO_CACHE to trx IO_CACHE at the stmt end
instead use only one (trx) IO_CACHE and truncate it if the statement is rolled back. don't use binlog_cache_mngr to accumulate the data, use binlog_cache_data instead. (binlog_cache_data owns one IO_CACHE, binlog_cache_mngr owns two binlog_cache_data's, trx and stmt).
-rw-r--r--sql/handler.cc13
-rw-r--r--sql/log.cc111
-rw-r--r--sql/log.h2
-rw-r--r--sql/sql_class.h2
4 files changed, 57 insertions, 71 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 443f8650174..f846f6687bd 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -7043,18 +7043,13 @@ bool handler::check_table_binlog_row_based_internal()
static int binlog_log_row_online_alter(TABLE* table,
const uchar *before_record,
const uchar *after_record,
- Log_func *log_func,
- bool has_trans)
+ Log_func *log_func)
{
THD *thd= table->in_use;
if (!table->online_alter_cache)
{
- auto *cache_mngr= online_alter_binlog_get_cache_mngr(thd, table);
- // Use transaction cache directly, if it is not multi-transaction mode
- table->online_alter_cache= binlog_get_cache_data(cache_mngr,
- !thd->in_multi_stmt_transaction_mode());
-
+ table->online_alter_cache= online_alter_binlog_get_cache_data(thd, table);
trans_register_ha(thd, false, binlog_hton, 0);
if (thd->in_multi_stmt_transaction_mode())
trans_register_ha(thd, true, binlog_hton, 0);
@@ -7066,7 +7061,7 @@ static int binlog_log_row_online_alter(TABLE* table,
table->rpl_write_set= &table->s->all_set;
int error= (*log_func)(thd, table, table->s->online_alter_binlog,
- table->online_alter_cache, has_trans,
+ table->online_alter_cache, true,
before_record, after_record);
table->rpl_write_set= old_rpl_write_set;
@@ -7124,7 +7119,7 @@ int handler::binlog_log_row(const uchar *before_record,
#ifdef HAVE_REPLICATION
if (unlikely(!error && table->s->online_alter_binlog))
error= binlog_log_row_online_alter(table, before_record, after_record,
- log_func, row_logging_has_trans);
+ log_func);
#endif // HAVE_REPLICATION
DBUG_RETURN(error);
diff --git a/sql/log.cc b/sql/log.cc
index 1d62599adae..a4b38d03592 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -276,10 +276,10 @@ void make_default_log_name(char **out, const char* log_ext, bool once)
Helper classes to store non-transactional and transactional data
before copying it to the binary log.
*/
-class binlog_cache_data
+class binlog_cache_data: public ilist_node<>
{
public:
- binlog_cache_data(): m_pending(0), status(0),
+ binlog_cache_data(): share(0), m_pending(0), status(0),
before_stmt_pos(MY_OFF_T_UNDEF),
incident(FALSE),
saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
@@ -362,6 +362,11 @@ public:
before_stmt_pos= pos;
}
+ void store_prev_position()
+ {
+ before_stmt_pos= my_b_write_tell(&cache_log);
+ }
+
void restore_prev_position()
{
truncate(before_stmt_pos);
@@ -414,6 +419,7 @@ public:
*/
IO_CACHE cache_log;
+ TABLE_SHARE *share; // for online alter table
private:
/*
Pending binrows event. This event is the event where the rows are currently
@@ -512,16 +518,15 @@ void Log_event_writer::set_incident()
}
-class binlog_cache_mngr: public ilist_node<> {
+class binlog_cache_mngr {
public:
binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size,
my_off_t param_max_binlog_cache_size,
ulong *param_ptr_binlog_stmt_cache_use,
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
- ulong *param_ptr_binlog_cache_disk_use,
- TABLE_SHARE *share)
- : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0), share(share)
+ ulong *param_ptr_binlog_cache_disk_use)
+ : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
{
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
param_ptr_binlog_stmt_cache_use,
@@ -587,8 +592,6 @@ public:
//Will be reset when gtid is written into binlog
uchar gtid_flags3;
decltype (rpl_gtid::seq_no) sa_seq_no;
-
- TABLE_SHARE *share;
private:
binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
@@ -2225,21 +2228,16 @@ static int binlog_commit_flush_xa_prepare(THD *thd, bool all,
#ifdef HAVE_REPLICATION
static void
-binlog_online_alter_cleanup(ilist<binlog_cache_mngr> &list,
- bool ending_trans)
+binlog_online_alter_cleanup(ilist<binlog_cache_data> &list, bool ending_trans)
{
- for (auto &cache: list)
- {
- cache.reset(true, ending_trans);
- }
if (ending_trans)
{
auto it= list.begin();
while (it != list.end())
{
auto &cache= *it++;
- cache.~binlog_cache_mngr();
- my_free(&cache);
+ cache.reset();
+ delete &cache;
}
list.clear();
DBUG_ASSERT(list.empty());
@@ -5911,7 +5909,7 @@ bool stmt_has_updated_non_trans_table(const THD* thd)
binlog_hton, which has internal linkage.
*/
-binlog_cache_mngr *binlog_setup_cache_mngr(TABLE_SHARE *share)
+binlog_cache_mngr *binlog_setup_cache_mngr()
{
auto *cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr,
sizeof(binlog_cache_mngr),
@@ -5932,8 +5930,7 @@ binlog_cache_mngr *binlog_setup_cache_mngr(TABLE_SHARE *share)
&binlog_stmt_cache_use,
&binlog_stmt_cache_disk_use,
&binlog_cache_use,
- &binlog_cache_disk_use,
- share);
+ &binlog_cache_disk_use);
return cache_mngr;
}
@@ -5946,7 +5943,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
if (!cache_mngr)
{
- cache_mngr= binlog_setup_cache_mngr(NULL);
+ cache_mngr= binlog_setup_cache_mngr();
thd_set_ha_data(this, binlog_hton, cache_mngr);
}
@@ -6325,9 +6322,27 @@ write_err:
}
-binlog_cache_mngr *online_alter_binlog_get_cache_mngr(THD *thd, TABLE *table)
+#ifdef HAVE_REPLICATION
+binlog_cache_data *binlog_setup_cache_data(TABLE_SHARE *share)
{
- ilist<binlog_cache_mngr> &list= thd->online_alter_cache_list;
+ auto cache= new binlog_cache_data();
+ if (!cache || open_cached_file(&cache->cache_log, mysql_tmpdir,
+ LOG_PREFIX, (size_t)binlog_cache_size, MYF(MY_WME)))
+ {
+ delete cache;
+ return NULL;
+ }
+
+ cache->share= share;
+ cache->set_binlog_cache_info(max_binlog_cache_size, &binlog_cache_use,
+ &binlog_cache_disk_use);
+ cache->store_prev_position();
+ return cache;
+}
+
+binlog_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table)
+{
+ ilist<binlog_cache_data> &list= thd->online_alter_cache_list;
/* we assume it's very rare to have more than one online ALTER running */
for (auto &cache: list)
@@ -6336,11 +6351,12 @@ binlog_cache_mngr *online_alter_binlog_get_cache_mngr(THD *thd, TABLE *table)
return &cache;
}
- auto *new_cache_mngr= binlog_setup_cache_mngr(table->s);
- list.push_back(*new_cache_mngr);
+ auto *new_cache_data= binlog_setup_cache_data(table->s);
+ list.push_back(*new_cache_data);
- return new_cache_mngr;
+ return new_cache_data;
}
+#endif
binlog_cache_mngr *THD::binlog_get_cache_mngr() const
{
@@ -7598,28 +7614,6 @@ private:
bool first;
};
-#ifdef HAVE_REPLICATION
-static int cache_copy(IO_CACHE *to, IO_CACHE *from)
-{
- DBUG_ENTER("cache_copy");
- if (reinit_io_cache(from, READ_CACHE, 0, 0, 0))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- size_t bytes_in_cache= my_b_bytes_in_cache(from);
-
- do
- {
- my_b_write(to, from->read_pos, bytes_in_cache);
-
- from->read_pos += bytes_in_cache;
- bytes_in_cache= my_b_fill(from);
- if (from->error || to->error)
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- } while (bytes_in_cache);
-
- DBUG_RETURN(0);
-}
-#endif
-
static int binlog_online_alter_commit(THD *thd, bool all)
{
DBUG_ENTER("binlog_online_alter_commit");
@@ -7631,28 +7625,21 @@ static int binlog_online_alter_commit(THD *thd, bool all)
bool is_ending_transaction= ending_trans(thd, all);
- for (auto &cache_mngr: thd->online_alter_cache_list)
+ for (auto &cache: thd->online_alter_cache_list)
{
- auto *binlog= cache_mngr.share->online_alter_binlog;
+ auto *binlog= cache.share->online_alter_binlog;
DBUG_ASSERT(binlog);
// do not set STMT_END for last event to leave table open in altering thd
- error= binlog_flush_pending_rows_event(thd, false, true, binlog,
- is_ending_transaction
- ? &cache_mngr.trx_cache
- : &cache_mngr.stmt_cache);
+ error= binlog_flush_pending_rows_event(thd, false, true, binlog, &cache);
if (is_ending_transaction)
{
mysql_mutex_lock(binlog->get_log_lock());
- error= binlog->write_cache(thd, &cache_mngr.trx_cache.cache_log);
-
+ error= binlog->write_cache(thd, &cache.cache_log);
mysql_mutex_unlock(binlog->get_log_lock());
}
else
- {
- error= cache_copy(&cache_mngr.trx_cache.cache_log,
- &cache_mngr.stmt_cache.cache_log);
- }
+ cache.store_prev_position();
if (error)
{
@@ -7680,12 +7667,16 @@ static void binlog_online_alter_rollback(THD *thd, bool all)
#ifdef HAVE_REPLICATION
bool is_ending_trans= ending_trans(thd, all);
+ if (!is_ending_trans)
+ for (auto &cache: thd->online_alter_cache_list)
+ cache.restore_prev_position();
+
/*
This is a crucial moment that we are running through
thd->online_alter_cache_list, and not through thd->open_tables to cleanup
stmt cache, though both have it. The reason is that the tables can be closed
to that moment in case of an error.
- The same reason applies to the fact we don't store cache_mngr in the table
+ The same reason applies to the fact we don't store cache in the table
itself -- because it can happen to be not existing.
Still in case if tables are left opened
*/
diff --git a/sql/log.h b/sql/log.h
index c26727f2bf8..1bf17103d53 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -1320,7 +1320,7 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
binlog_cache_data *cache_data);
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
-binlog_cache_mngr *online_alter_binlog_get_cache_mngr(THD *thd, TABLE *table);
+binlog_cache_data *online_alter_binlog_get_cache_data(THD *thd, TABLE *table);
binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
bool use_trans_cache);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 355c32b159f..8151ade7042 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -5508,7 +5508,7 @@ public:
Item *sp_prepare_func_item(Item **it_addr, uint cols);
bool sp_eval_expr(Field *result_field, Item **expr_item_ptr);
- ilist<binlog_cache_mngr> online_alter_cache_list;
+ ilist<binlog_cache_data> online_alter_cache_list;
bool sql_parser(LEX *old_lex, LEX *lex,
char *str, uint str_len, bool stmt_prepare_mode);