summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log_event.cc116
-rw-r--r--sql/log_event.h10
-rw-r--r--sql/slave.cc23
3 files changed, 96 insertions, 53 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fdf8007283d..fbc9db1f5b7 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -789,7 +789,8 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
*/
int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
- const char *src, char **dst, ulong *newlen)
+ const char *src, char* buf, ulong buf_size, bool* is_malloc,
+ char **dst, ulong *newlen)
{
ulong len = uint4korr(src + EVENT_LEN_OFFSET);
const char *tmp = src;
@@ -810,37 +811,54 @@ int query_event_uncompress(const Format_description_log_event *description_event
*newlen = (tmp - src) + un_len;
if(contain_checksum)
*newlen += BINLOG_CHECKSUM_LEN;
-
- *dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE));
- if (!*dst)
- {
- return 1;
- }
+
+ uint32 alloc_size = ALIGN_SIZE(*newlen);
+ char *new_dst = NULL;
- /* copy the head*/
- memcpy(*dst, src , tmp - src);
- if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len))
+ *is_malloc = false;
+ if (alloc_size <= buf_size)
+ {
+ new_dst = buf;
+ }
+ else
+ {
+ new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
+ if (!new_dst)
+ return 1;
+
+ *is_malloc = true;
+ }
+
+ /* copy the head*/
+ memcpy(new_dst, src , tmp - src);
+ if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
{
- my_free(*dst);
+ if (*is_malloc)
+ my_free(new_dst);
+
+ *is_malloc = false;
+
return 1;
}
- (*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT;
- int4store(*dst + EVENT_LEN_OFFSET, *newlen);
+ new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT;
+ int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
if(contain_checksum){
ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
- int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len));
+ int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
}
+ *dst = new_dst;
return 0;
}
-int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
- const char *src, char **dst, ulong *newlen)
+int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char* buf, ulong buf_size, bool* is_malloc,
+ char **dst, ulong *newlen)
{
Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
ulong len = uint4korr(src + EVENT_LEN_OFFSET);
const char *tmp = src;
- char *buf = NULL;
+ char *new_dst = NULL;
DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
@@ -884,28 +902,39 @@ int Row_log_event_uncompress(const Format_description_log_event *description_eve
*newlen += BINLOG_CHECKSUM_LEN;
uint32 alloc_size = ALIGN_SIZE(*newlen);
- buf = (char *)my_malloc(alloc_size , MYF(MY_FAE));
- if (!buf)
- {
- return 1;
- }
+
+ *is_malloc = false;
+ if (alloc_size <= buf_size)
+ {
+ new_dst = buf;
+ }
+ else
+ {
+ new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
+ if (!new_dst)
+ return 1;
+
+ *is_malloc = true;
+ }
/* copy the head*/
- memcpy(buf, src , tmp - src);
+ memcpy(new_dst, src , tmp - src);
/* uncompress the body */
- if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len))
+ if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
{
- my_free(buf);
+ if (*is_malloc)
+ my_free(new_dst);
+
return 1;
}
- buf[EVENT_TYPE_OFFSET] = type;
- int4store(buf + EVENT_LEN_OFFSET, *newlen);
+ new_dst[EVENT_TYPE_OFFSET] = type;
+ int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
if(contain_checksum){
ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
- int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len));
+ int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
}
- *dst = buf;
+ *dst = new_dst;
return 0;
}
@@ -3504,14 +3533,15 @@ bool Query_compressed_log_event::write()
{
const char *query_tmp = query;
uint32 q_len_tmp = q_len;
+ uint32 alloc_size;
bool ret = true;
- q_len = binlog_get_compress_len(q_len);
- query = (char *)my_malloc(q_len, MYF(MY_FAE));
+ q_len = alloc_size = binlog_get_compress_len(q_len);
+ query = (char *)my_safe_alloca(alloc_size);
if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len))
{
ret = Query_log_event::write();
}
- my_free((void *)query);
+ my_safe_afree((void *)query, alloc_size);
query = query_tmp;
q_len = q_len_tmp;
return ret;
@@ -10786,14 +10816,15 @@ bool Rows_log_event::write_compressed()
uchar *m_rows_buf_tmp = m_rows_buf;
uchar *m_rows_cur_tmp = m_rows_cur;
bool ret = true;
- uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
- m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE));
+ uint32 comlen, alloc_size;
+ comlen = alloc_size = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
+ m_rows_buf = (uchar *)my_safe_alloca(alloc_size);
if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
{
m_rows_cur = comlen + m_rows_buf;
ret = Log_event::write();
}
- my_free(m_rows_buf);
+ my_safe_afree(m_rows_buf, alloc_size);
m_rows_buf = m_rows_buf_tmp;
m_rows_cur = m_rows_cur_tmp;
return ret;
@@ -12242,8 +12273,9 @@ void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_
{
char *new_buf;
ulong len;
- if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
- temp_buf, &new_buf, &len))
+ bool is_malloc = false;
+ if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{
free_temp_buf();
register_temp_buf(new_buf, true);
@@ -12911,8 +12943,9 @@ void Delete_rows_compressed_log_event::print(FILE *file,
{
char *new_buf;
ulong len;
- if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
- temp_buf, &new_buf, &len))
+ bool is_malloc = false;
+ if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{
free_temp_buf();
register_temp_buf(new_buf, true);
@@ -13167,8 +13200,9 @@ void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print
{
char *new_buf;
ulong len;
- if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
- temp_buf, &new_buf, &len))
+ bool is_malloc = false;
+ if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{
free_temp_buf();
register_temp_buf(new_buf, true);
diff --git a/sql/log_event.h b/sql/log_event.h
index 14bfc54aed3..59e4dcd8527 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -5074,11 +5074,13 @@ int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen
uint32 binlog_get_compress_len(uint32 len);
uint32 binlog_get_uncompress_len(const char *buf);
-int query_event_uncompress(const Format_description_log_event *description_event,
- bool contain_checksum, const char *src, char **dst, ulong *newlen);
+int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char* buf, ulong buf_size, bool* is_malloc,
+ char **dst, ulong *newlen);
-int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
- const char *src, char **dst, ulong *newlen);
+int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char* buf, ulong buf_size, bool* is_malloc,
+ char **dst, ulong *newlen);
#endif /* _log_event_h */
diff --git a/sql/slave.cc b/sql/slave.cc
index f7efb97e44d..62eb24da14c 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -5664,7 +5664,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
- bool compressed_event = FALSE;
+ bool is_compress_event = false;
+ char* new_buf = NULL;
+ char new_buf_arr[4096];
+ bool is_malloc = false;
/*
FD_q must have been prepared for the first R_a event
@@ -6150,7 +6153,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
case QUERY_COMPRESSED_EVENT:
inc_pos= event_len;
- if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
+ if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
{
char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR;
@@ -6159,7 +6163,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(llbuf, strlen(llbuf));
goto err;
}
- compressed_event = true;
+ buf = new_buf;
+ is_compress_event = true;
goto default_action;
case WRITE_ROWS_COMPRESSED_EVENT:
@@ -6170,7 +6175,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
case DELETE_ROWS_COMPRESSED_EVENT_V1:
inc_pos = event_len;
{
- if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
+ if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+ buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
{
char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR;
@@ -6180,7 +6186,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err;
}
}
- compressed_event = true;
+ buf = new_buf;
+ is_compress_event = true;
goto default_action;
#ifndef DBUG_OFF
@@ -6233,7 +6240,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid;
}
- if (!compressed_event)
+ if (!is_compress_event)
inc_pos= event_len;
break;
@@ -6429,8 +6436,8 @@ err:
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
- if(compressed_event)
- my_free((void *)buf);
+ if(is_malloc)
+ my_free((void *)new_buf);
DBUG_RETURN(error);
}