diff options
-rw-r--r-- | sql/log_event.cc | 116 | ||||
-rw-r--r-- | sql/log_event.h | 10 | ||||
-rw-r--r-- | sql/slave.cc | 23 |
3 files changed, 96 insertions, 53 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index fdf8007283d..fbc9db1f5b7 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -789,7 +789,8 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen) */ int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, - const char *src, char **dst, ulong *newlen) + const char *src, char* buf, ulong buf_size, bool* is_malloc, + char **dst, ulong *newlen) { ulong len = uint4korr(src + EVENT_LEN_OFFSET); const char *tmp = src; @@ -810,37 +811,54 @@ int query_event_uncompress(const Format_description_log_event *description_event *newlen = (tmp - src) + un_len; if(contain_checksum) *newlen += BINLOG_CHECKSUM_LEN; - - *dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE)); - if (!*dst) - { - return 1; - } + + uint32 alloc_size = ALIGN_SIZE(*newlen); + char *new_dst = NULL; - /* copy the head*/ - memcpy(*dst, src , tmp - src); - if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len)) + *is_malloc = false; + if (alloc_size <= buf_size) + { + new_dst = buf; + } + else + { + new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME)); + if (!new_dst) + return 1; + + *is_malloc = true; + } + + /* copy the head*/ + memcpy(new_dst, src , tmp - src); + if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len)) { - my_free(*dst); + if (*is_malloc) + my_free(new_dst); + + *is_malloc = false; + return 1; } - (*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT; - int4store(*dst + EVENT_LEN_OFFSET, *newlen); + new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT; + int4store(new_dst + EVENT_LEN_OFFSET, *newlen); if(contain_checksum){ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; - int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len)); + int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len)); } + *dst = new_dst; return 0; } -int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, - const char *src, char **dst, ulong *newlen) +int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, char* buf, ulong buf_size, bool* is_malloc, + char **dst, ulong *newlen) { Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET]; ulong len = uint4korr(src + EVENT_LEN_OFFSET); const char *tmp = src; - char *buf = NULL; + char *new_dst = NULL; DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type)); @@ -884,28 +902,39 @@ int Row_log_event_uncompress(const Format_description_log_event *description_eve *newlen += BINLOG_CHECKSUM_LEN; uint32 alloc_size = ALIGN_SIZE(*newlen); - buf = (char *)my_malloc(alloc_size , MYF(MY_FAE)); - if (!buf) - { - return 1; - } + + *is_malloc = false; + if (alloc_size <= buf_size) + { + new_dst = buf; + } + else + { + new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME)); + if (!new_dst) + return 1; + + *is_malloc = true; + } /* copy the head*/ - memcpy(buf, src , tmp - src); + memcpy(new_dst, src , tmp - src); /* uncompress the body */ - if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len)) + if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len)) { - my_free(buf); + if (*is_malloc) + my_free(new_dst); + return 1; } - buf[EVENT_TYPE_OFFSET] = type; - int4store(buf + EVENT_LEN_OFFSET, *newlen); + new_dst[EVENT_TYPE_OFFSET] = type; + int4store(new_dst + EVENT_LEN_OFFSET, *newlen); if(contain_checksum){ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; - int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len)); + int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len)); } - *dst = buf; + *dst = new_dst; return 0; } @@ -3504,14 +3533,15 @@ bool Query_compressed_log_event::write() { const char *query_tmp = query; uint32 q_len_tmp = q_len; + uint32 alloc_size; bool ret = true; - q_len = binlog_get_compress_len(q_len); - query = (char *)my_malloc(q_len, MYF(MY_FAE)); + q_len = alloc_size = binlog_get_compress_len(q_len); + query = (char *)my_safe_alloca(alloc_size); if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len)) { ret = Query_log_event::write(); } - my_free((void *)query); + my_safe_afree((void *)query, alloc_size); query = query_tmp; q_len = q_len_tmp; return ret; @@ -10786,14 +10816,15 @@ bool Rows_log_event::write_compressed() uchar *m_rows_buf_tmp = m_rows_buf; uchar *m_rows_cur_tmp = m_rows_cur; bool ret = true; - uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp); - m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE)); + uint32 comlen, alloc_size; + comlen = alloc_size = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp); + m_rows_buf = (uchar *)my_safe_alloca(alloc_size); if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen)) { m_rows_cur = comlen + m_rows_buf; ret = Log_event::write(); } - my_free(m_rows_buf); + my_safe_afree(m_rows_buf, alloc_size); m_rows_buf = m_rows_buf_tmp; m_rows_cur = m_rows_cur_tmp; return ret; @@ -12242,8 +12273,9 @@ void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_ { char *new_buf; ulong len; - if(!Row_log_event_uncompress(glob_description_event, checksum_alg, - temp_buf, &new_buf, &len)) + bool is_malloc = false; + if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + temp_buf, NULL, 0, &is_malloc, &new_buf, &len)) { free_temp_buf(); register_temp_buf(new_buf, true); @@ -12911,8 +12943,9 @@ void Delete_rows_compressed_log_event::print(FILE *file, { char *new_buf; ulong len; - if(!Row_log_event_uncompress(glob_description_event, checksum_alg, - temp_buf, &new_buf, &len)) + bool is_malloc = false; + if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + temp_buf, NULL, 0, &is_malloc, &new_buf, &len)) { free_temp_buf(); register_temp_buf(new_buf, true); @@ -13167,8 +13200,9 @@ void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print { char *new_buf; ulong len; - if(!Row_log_event_uncompress(glob_description_event, checksum_alg, - temp_buf, &new_buf, &len)) + bool is_malloc = false; + if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + temp_buf, NULL, 0, &is_malloc, &new_buf, &len)) { free_temp_buf(); register_temp_buf(new_buf, true); diff --git a/sql/log_event.h b/sql/log_event.h index 14bfc54aed3..59e4dcd8527 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -5074,11 +5074,13 @@ int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen uint32 binlog_get_compress_len(uint32 len); uint32 binlog_get_uncompress_len(const char *buf); -int query_event_uncompress(const Format_description_log_event *description_event, - bool contain_checksum, const char *src, char **dst, ulong *newlen); +int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, char* buf, ulong buf_size, bool* is_malloc, + char **dst, ulong *newlen); -int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, - const char *src, char **dst, ulong *newlen); +int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, char* buf, ulong buf_size, bool* is_malloc, + char **dst, ulong *newlen); #endif /* _log_event_h */ diff --git a/sql/slave.cc b/sql/slave.cc index f7efb97e44d..62eb24da14c 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5664,7 +5664,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) bool gtid_skip_enqueue= false; bool got_gtid_event= false; rpl_gtid event_gtid; - bool compressed_event = FALSE; + bool is_compress_event = false; + char* new_buf = NULL; + char new_buf_arr[4096]; + bool is_malloc = false; /* FD_q must have been prepared for the first R_a event @@ -6150,7 +6153,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ case QUERY_COMPRESSED_EVENT: inc_pos= event_len; - if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len)) + if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len)) { char llbuf[22]; error = ER_BINLOG_UNCOMPRESS_ERROR; @@ -6159,7 +6163,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) error_msg.append(llbuf, strlen(llbuf)); goto err; } - compressed_event = true; + buf = new_buf; + is_compress_event = true; goto default_action; case WRITE_ROWS_COMPRESSED_EVENT: @@ -6170,7 +6175,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) case DELETE_ROWS_COMPRESSED_EVENT_V1: inc_pos = event_len; { - if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len)) + if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, + buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len)) { char llbuf[22]; error = ER_BINLOG_UNCOMPRESS_ERROR; @@ -6180,7 +6186,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) goto err; } } - compressed_event = true; + buf = new_buf; + is_compress_event = true; goto default_action; #ifndef DBUG_OFF @@ -6233,7 +6240,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ++mi->events_queued_since_last_gtid; } - if (!compressed_event) + if (!is_compress_event) inc_pos= event_len; break; @@ -6429,8 +6436,8 @@ err: mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), error_msg.ptr()); - if(compressed_event) - my_free((void *)buf); + if(is_malloc) + my_free((void *)new_buf); DBUG_RETURN(error); } |