diff options
-rw-r--r-- | client/mysqlbinlog.cc | 7 | ||||
-rw-r--r-- | sql/log.cc | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 550 | ||||
-rw-r--r-- | sql/log_event.h | 123 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/mysqld.h | 3 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 6 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 2 | ||||
-rw-r--r-- | sql/slave.cc | 67 | ||||
-rw-r--r-- | sql/sql_class.cc | 52 | ||||
-rw-r--r-- | sql/sql_class.h | 6 | ||||
-rw-r--r-- | sql/sql_repl.cc | 4 | ||||
-rw-r--r-- | sql/sys_vars.cc | 12 |
13 files changed, 797 insertions, 39 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 6a52c9fe29a..94443791441 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -1002,6 +1002,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, switch (ev_type) { case QUERY_EVENT: + case QUERY_COMPRESSED_EVENT: { Query_log_event *qe= (Query_log_event*)ev; if (!qe->is_trans_keyword()) @@ -1243,6 +1244,12 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, case WRITE_ROWS_EVENT_V1: case UPDATE_ROWS_EVENT_V1: case DELETE_ROWS_EVENT_V1: + case WRITE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: { Rows_log_event *e= (Rows_log_event*) ev; if (print_row_event(print_event_info, ev, e->get_table_id(), diff --git a/sql/log.cc b/sql/log.cc index 2c290715741..4d903154d98 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -9838,7 +9838,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ((last_gtid_standalone && !ev->is_part_of_group(typ)) || (!last_gtid_standalone && (typ == XID_EVENT || - (typ == QUERY_EVENT && + (LOG_EVENT_IS_QUERY(typ) && (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback())))))) { diff --git a/sql/log_event.cc b/sql/log_event.cc index 01bb5e7a561..fdf8007283d 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -51,6 +51,7 @@ #include "rpl_utility.h" #include "rpl_constants.h" #include "sql_digest.h" +#include "zlib.h" #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1)) @@ -702,6 +703,271 @@ char *str_to_hex(char *to, const char *from, uint len) return to; // pointer to end 0 of 'to' } +#define BINLOG_COMPRESSED_HEADER_LEN 1 +#define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4 +/** + Compressed Record + Record Header: 1 Byte + 0 Bit: Always 1, mean compressed; + 1-3 Bit: Reversed, compressed algorithmAlways 0, means zlib + 4-7 Bit: Bytes of "Record Original Length" + Record Original Length: 1-4 Bytes + Compressed Buf: +*/ + +/** + Get the length of compress content. +*/ + +uint32 binlog_get_compress_len(uint32 len) +{ + /* 5 for the begin content, 1 reserved for a '\0'*/ + return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES) + + compressBound(len) + 1); +} + +/** + Compress buf from 'src' to 'dst'. + + Note: 1) Then the caller should guarantee the length of 'dst', which + can be got by binlog_get_uncompress_len, is enough to hold + the content uncompressed. + 2) The 'comlen' should stored the length of 'dst', and it will + be set as the size of compressed content after return. + + return zero if successful, others otherwise. +*/ +int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen) +{ + uchar lenlen; + if (len & 0xFF000000) + { + dst[1] = uchar(len >> 24); + dst[2] = uchar(len >> 16); + dst[3] = uchar(len >> 8); + dst[4] = uchar(len); + lenlen = 4; + } + else if (len & 0x00FF0000) + { + dst[1] = uchar(len >> 16); + dst[2] = uchar(len >> 8); + dst[3] = uchar(len); + lenlen = 3; + } + else if (len & 0x0000FF00) + { + dst[1] = uchar(len >> 8); + dst[2] = uchar(len); + lenlen = 2; + } + else + { + dst[1] = uchar(len); + lenlen = 1; + } + dst[0] = 0x80 | (lenlen & 0x07); + + uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1; + if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen, (const Bytef *)src, (uLongf)len) != Z_OK) + { + return 1; + } + *comlen = (uint32)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen; + return 0; +} + +/** + Convert a query_compressed_log_event to query_log_event + from 'src' to 'dst'(malloced inside), the size after compress + stored in 'newlen'. + + @Warning: + 1)The caller should call my_free to release 'dst'. + + return zero if successful, others otherwise. +*/ + +int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, char **dst, ulong *newlen) +{ + ulong len = uint4korr(src + EVENT_LEN_OFFSET); + const char *tmp = src; + + DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT); + + uint8 common_header_len= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[QUERY_COMPRESSED_EVENT-1]; + + tmp += common_header_len; + + uint db_len = (uint)tmp[Q_DB_LEN_OFFSET]; + uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET); + + tmp += post_header_len + status_vars_len + db_len + 1; + + uint32 un_len = binlog_get_uncompress_len(tmp); + *newlen = (tmp - src) + un_len; + if(contain_checksum) + *newlen += BINLOG_CHECKSUM_LEN; + + *dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE)); + if (!*dst) + { + return 1; + } + + /* copy the head*/ + memcpy(*dst, src , tmp - src); + if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len)) + { + my_free(*dst); + return 1; + } + + (*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT; + int4store(*dst + EVENT_LEN_OFFSET, *newlen); + if(contain_checksum){ + ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; + int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len)); + } + return 0; +} + +int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, char **dst, ulong *newlen) +{ + Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET]; + ulong len = uint4korr(src + EVENT_LEN_OFFSET); + const char *tmp = src; + char *buf = NULL; + + DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type)); + + uint8 common_header_len= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[type-1]; + + tmp += common_header_len + ROWS_HEADER_LEN_V1; + if (post_header_len == ROWS_HEADER_LEN_V2) + { + /* + Have variable length header, check length, + which includes length bytes + */ + uint16 var_header_len= uint2korr(tmp); + assert(var_header_len >= 2); + + /* skip over var-len header, extracting 'chunks' */ + tmp += var_header_len; + + /* get the uncompressed event type */ + type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT); + } + else + { + /* get the uncompressed event type */ + type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1); + } + + + ulong m_width = net_field_length((uchar **)&tmp); + tmp += (m_width + 7) / 8; + + if (type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_EVENT) + { + tmp += (m_width + 7) / 8; + } + + uint32 un_len = binlog_get_uncompress_len(tmp); + *newlen = (tmp - src) + un_len; + if(contain_checksum) + *newlen += BINLOG_CHECKSUM_LEN; + + uint32 alloc_size = ALIGN_SIZE(*newlen); + buf = (char *)my_malloc(alloc_size , MYF(MY_FAE)); + if (!buf) + { + return 1; + } + + /* copy the head*/ + memcpy(buf, src , tmp - src); + /* uncompress the body */ + if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len)) + { + my_free(buf); + return 1; + } + + buf[EVENT_TYPE_OFFSET] = type; + int4store(buf + EVENT_LEN_OFFSET, *newlen); + if(contain_checksum){ + ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; + int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len)); + } + *dst = buf; + return 0; +} + +/** + Get the length of uncompress content. +*/ + +uint32 binlog_get_uncompress_len(const char *buf) +{ + DBUG_ASSERT((buf[0] & 0xe0) == 0x80); + uint32 lenlen = buf[0] & 0x07; + uint32 len = 0; + switch(lenlen) + { + case 1: + len = uchar(buf[1]); + break; + case 2: + len = uchar(buf[1]) << 8 | uchar(buf[2]); + break; + case 3: + len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]); + break; + case 4: + len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 | uchar(buf[3]) << 8 | uchar(buf[4]); + break; + default: + DBUG_ASSERT(lenlen >= 1 && lenlen <= 4); + break; + } + return len; +} + +/** + Uncompress the content in 'src' with length of 'len' to 'dst'. + + Note: 1) Then the caller should guarantee the length of 'dst' (which + can be got by statement_get_uncompress_len) is enough to hold + the content uncompressed. + 2) The 'newlen' should stored the length of 'dst', and it will + be set as the size of uncompressed content after return. + + return zero if successful, others otherwise. +*/ +int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen) +{ + if((src[0] & 0x80) == 0) + { + return 1; + } + + uint32 lenlen = src[0] & 0x07; + uLongf buflen = *newlen; + if(uncompress((Bytef *)dst, &buflen, (const Bytef*)src + 1 + lenlen, len) != Z_OK) + { + return 1; + } + + *newlen = (uint32)buflen; + return 0; +} + #ifndef MYSQL_CLIENT /** @@ -828,6 +1094,13 @@ const char* Log_event::get_type_str(Log_event_type type) case TRANSACTION_CONTEXT_EVENT: return "Transaction_context"; case VIEW_CHANGE_EVENT: return "View_change"; case XA_PREPARE_LOG_EVENT: return "XA_prepare"; + case QUERY_COMPRESSED_EVENT: return "Query_compressed"; + case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed"; + case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed"; + case DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed"; + case WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1"; + case UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1"; + case DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1"; default: return "Unknown"; /* impossible */ } @@ -1661,6 +1934,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case QUERY_EVENT: ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT); break; + case QUERY_COMPRESSED_EVENT: + ev = new Query_compressed_log_event(buf, event_len, fdle, QUERY_COMPRESSED_EVENT); + break; case LOAD_EVENT: ev = new Load_log_event(buf, event_len, fdle); break; @@ -1735,6 +2011,19 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, ev = new Delete_rows_log_event(buf, event_len, fdle); break; + case WRITE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + ev = new Write_rows_compressed_log_event(buf, event_len, fdle); + break; + case UPDATE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + ev = new Update_rows_compressed_log_event(buf, event_len, fdle); + break; + case DELETE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + ev = new Delete_rows_compressed_log_event(buf, event_len, fdle); + break; + /* MySQL GTID events are ignored */ case GTID_LOG_EVENT: case ANONYMOUS_GTID_LOG_EVENT: @@ -1778,7 +2067,7 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, else { DBUG_PRINT("error",("Unknown event code: %d", - (int) buf[EVENT_TYPE_OFFSET])); + (uchar) buf[EVENT_TYPE_OFFSET])); ev= NULL; break; } @@ -2831,11 +3120,32 @@ void Log_event::print_base64(IO_CACHE* file, } case UPDATE_ROWS_EVENT: case UPDATE_ROWS_EVENT_V1: - { - ev= new Update_rows_log_event((const char*) ptr, size, - glob_description_event); - break; - } + { + ev= new Update_rows_log_event((const char*) ptr, size, + glob_description_event); + break; + } + case WRITE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + { + ev= new Write_rows_compressed_log_event((const char*) ptr, size, + glob_description_event); + break; + } + case UPDATE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + { + ev= new Update_rows_compressed_log_event((const char*) ptr, size, + glob_description_event); + break; + } + case DELETE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + { + ev= new Delete_rows_compressed_log_event((const char*) ptr, size, + glob_description_event); + break; + } default: break; } @@ -3190,6 +3500,23 @@ bool Query_log_event::write() write_footer(); } +bool Query_compressed_log_event::write() +{ + const char *query_tmp = query; + uint32 q_len_tmp = q_len; + bool ret = true; + q_len = binlog_get_compress_len(q_len); + query = (char *)my_malloc(q_len, MYF(MY_FAE)); + if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len)) + { + ret = Query_log_event::write(); + } + my_free((void *)query); + query = query_tmp; + q_len = q_len_tmp; + return ret; +} + /** The simplest constructor that could possibly work. This is used for creating static objects that have a special meaning and are invisible @@ -3382,6 +3709,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu cache_tye: %d", (ulong) flags2, sql_mode, cache_type)); } + +Query_compressed_log_event::Query_compressed_log_event(THD* thd_arg, const char* query_arg, + ulong query_length, bool using_trans, + bool direct, bool suppress_use, int errcode) + :Query_log_event(thd_arg, query_arg, query_length, using_trans, direct, suppress_use, errcode), + query_buf(0) +{ + +} #endif /* MYSQL_CLIENT */ @@ -3788,6 +4124,28 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, DBUG_VOID_RETURN; } +Query_compressed_log_event::Query_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event, + Log_event_type event_type) + :Query_log_event(buf, event_len, description_event, event_type),query_buf(NULL) +{ + if(query) + { + uint32 un_len=binlog_get_uncompress_len(query); + query_buf = (Log_event::Byte*)my_malloc(ALIGN_SIZE(un_len + 1), MYF(MY_WME)); //reserve one byte for '\0' + if(query_buf && !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len)) + { + query_buf[un_len] = 0; + query = (const char *)query_buf; + q_len = un_len; + } + else + { + query= 0; + } + } +} /* Replace a binlog event read into a packet with a dummy event. Either a @@ -5056,6 +5414,15 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN; post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN; + //compressed event + post_header_len[QUERY_COMPRESSED_EVENT-1]= QUERY_HEADER_LEN; + post_header_len[WRITE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2; + post_header_len[UPDATE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2; + post_header_len[DELETE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2; + post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1; + post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1; + post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1; + // Sanity-check that all post header lengths are initialized. int i; for (i=0; i<number_of_event_types; i++) @@ -9404,7 +9771,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, { DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); uint8 const common_header_len= description_event->common_header_len; - Log_event_type event_type= (Log_event_type) buf[EVENT_TYPE_OFFSET]; + Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]; m_type= event_type; uint8 const post_header_len= description_event->post_header_len[event_type-1]; @@ -9503,8 +9870,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */ - if ((event_type == UPDATE_ROWS_EVENT) || - (event_type == UPDATE_ROWS_EVENT_V1)) + if (LOG_EVENT_IS_UPDATE_ROW(event_type)) { DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); @@ -9551,6 +9917,31 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, DBUG_VOID_RETURN; } +void Rows_log_event::uncompress_buf() +{ + uint32 un_len = binlog_get_uncompress_len((char *)m_rows_buf); + uchar *new_buf= (uchar*) my_malloc(ALIGN_SIZE(un_len), MYF(MY_WME)); + if (new_buf) + { + if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf, m_rows_cur - m_rows_buf, &un_len)) + { + my_free(m_rows_buf); + m_rows_buf = new_buf; +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + m_curr_row= m_rows_buf; +#endif + m_rows_end= m_rows_buf + un_len; + m_rows_cur= m_rows_end; + return; + } + else + { + my_free(new_buf); + } + } + m_cols.bitmap= 0; // catch it in is_valid +} + Rows_log_event::~Rows_log_event() { if (m_cols.bitmap == m_bitbuf) // no my_malloc happened @@ -9573,7 +9964,8 @@ int Rows_log_event::get_data_size() (m_rows_cur - m_rows_buf);); int data_size= 0; - bool is_v2_event= get_type_code() > DELETE_ROWS_EVENT_V1; + Log_event_type type = get_type_code(); + bool is_v2_event= LOG_EVENT_IS_ROW_V2(type); if (is_v2_event) { data_size= ROWS_HEADER_LEN_V2 + @@ -10388,6 +10780,24 @@ bool Rows_log_event::write_data_body() return res; } + +bool Rows_log_event::write_compressed() +{ + uchar *m_rows_buf_tmp = m_rows_buf; + uchar *m_rows_cur_tmp = m_rows_cur; + bool ret = true; + uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp); + m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE)); + if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen)) + { + m_rows_cur = comlen + m_rows_buf; + ret = Log_event::write(); + } + my_free(m_rows_buf); + m_rows_buf = m_rows_buf_tmp; + m_rows_cur = m_rows_cur_tmp; + return ret; +} #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) @@ -11302,6 +11712,20 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, is_transactional, WRITE_ROWS_EVENT_V1) { } + +Write_rows_compressed_log_event::Write_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid_arg, + bool is_transactional) + : Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional) +{ + //m_type = log_bin_use_v1_row_events ? WRITE_ROWS_COMPRESSED_EVENT_V1 : WRITE_ROWS_COMPRESSED_EVENT; + m_type = WRITE_ROWS_COMPRESSED_EVENT_V1; +} + +bool Write_rows_compressed_log_event::write() +{ + return Rows_log_event::write_compressed(); +} #endif /* @@ -11314,6 +11738,14 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, : Rows_log_event(buf, event_len, description_event) { } + +Write_rows_compressed_log_event::Write_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event) +: Write_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -11805,6 +12237,23 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) {DBUG_SET("+d,simulate_my_b_fill_error");}); Rows_log_event::print_helper(file, print_event_info, "Write_rows"); } + +void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) +{ + char *new_buf; + ulong len; + if(!Row_log_event_uncompress(glob_description_event, checksum_alg, + temp_buf, &new_buf, &len)) + { + free_temp_buf(); + register_temp_buf(new_buf, true); + Rows_log_event::print_helper(file, print_event_info, "Write_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, "ERROR: uncompress write_compressed_rows failed\n"); + } +} #endif @@ -12325,6 +12774,20 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, DELETE_ROWS_EVENT_V1) { } + +Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid_arg, + bool is_transactional) + : Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional) +{ + //m_type = log_bin_use_v1_row_events ? DELETE_ROWS_COMPRESSED_EVENT_V1 : DELETE_ROWS_COMPRESSED_EVENT; + m_type = DELETE_ROWS_COMPRESSED_EVENT_V1; +} + +bool Delete_rows_compressed_log_event::write() +{ + return Rows_log_event::write_compressed(); +} #endif /* #if !defined(MYSQL_CLIENT) */ /* @@ -12337,6 +12800,14 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, : Rows_log_event(buf, event_len, description_event) { } + +Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event) + : Delete_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -12434,6 +12905,24 @@ void Delete_rows_log_event::print(FILE *file, { Rows_log_event::print_helper(file, print_event_info, "Delete_rows"); } + +void Delete_rows_compressed_log_event::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + char *new_buf; + ulong len; + if(!Row_log_event_uncompress(glob_description_event, checksum_alg, + temp_buf, &new_buf, &len)) + { + free_temp_buf(); + register_temp_buf(new_buf, true); + Rows_log_event::print_helper(file, print_event_info, "Delete_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, "ERROR: uncompress delete_compressed_rows failed\n"); + } +} #endif @@ -12461,6 +12950,20 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, init(tbl_arg->rpl_write_set); } +Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid, + bool is_transactional) +: Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional) +{ + //m_type = log_bin_use_v1_row_events ? UPDATE_ROWS_COMPRESSED_EVENT_V1 : UPDATE_ROWS_COMPRESSED_EVENT; + m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1; +} + +bool Update_rows_compressed_log_event::write() +{ + return Rows_log_event::write_compressed(); +} + void Update_rows_log_event::init(MY_BITMAP const *cols) { /* if my_bitmap_init fails, caught in is_valid() */ @@ -12499,6 +13002,14 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, : Rows_log_event(buf, event_len, description_event) { } + +Update_rows_compressed_log_event::Update_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event) + : Update_rows_log_event(buf, event_len, description_event) +{ + uncompress_buf(); +} #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -12651,6 +13162,23 @@ void Update_rows_log_event::print(FILE *file, { Rows_log_event::print_helper(file, print_event_info, "Update_rows"); } + +void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + char *new_buf; + ulong len; + if(!Row_log_event_uncompress(glob_description_event, checksum_alg, + temp_buf, &new_buf, &len)) + { + free_temp_buf(); + register_temp_buf(new_buf, true); + Rows_log_event::print_helper(file, print_event_info, "Update_compressed_rows"); + } + else + { + my_b_printf(&print_event_info->head_cache, "ERROR: uncompress update_compressed_rows failed\n"); + } +} #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) @@ -12778,7 +13306,7 @@ err: DBUG_ASSERT(error != 0); sql_print_error("Error in Log_event::read_log_event(): " "'%s', data_len: %d, event_type: %d", - error,data_len,head[EVENT_TYPE_OFFSET]); + error,data_len,(uchar)head[EVENT_TYPE_OFFSET]); } (*arg_buf)+= data_len; (*arg_buf_len)-= data_len; diff --git a/sql/log_event.h b/sql/log_event.h index 306f78ca4c9..14bfc54aed3 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -692,11 +692,33 @@ enum Log_event_type START_ENCRYPTION_EVENT= 164, + /* + Compressed binlog event. + */ + QUERY_COMPRESSED_EVENT = 165, + WRITE_ROWS_COMPRESSED_EVENT_V1 = 166, + UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167, + DELETE_ROWS_COMPRESSED_EVENT_V1 = 168, + WRITE_ROWS_COMPRESSED_EVENT = 169, + UPDATE_ROWS_COMPRESSED_EVENT = 170, + DELETE_ROWS_COMPRESSED_EVENT = 171, + /* Add new MariaDB events here - right above this comment! */ ENUM_END_EVENT /* end marker */ }; +#define LOG_EVENT_IS_QUERY(type) (type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT) +#define LOG_EVENT_IS_WRITE_ROW(type) (type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 || type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1) +#define LOG_EVENT_IS_UPDATE_ROW(type) (type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1) +#define LOG_EVENT_IS_DELETE_ROW(type) (type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 || type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1) +#define LOG_EVENT_IS_ROW_COMPRESSED(type) (type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||\ + type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||\ + type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1) +#define LOG_EVENT_IS_ROW_V2(type) (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT || \ + type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT ) + + /* The number of types we handle in Format_description_log_event (UNKNOWN_EVENT is not to be handled, it does not exist in binlogs, it does not have a @@ -2045,9 +2067,37 @@ public: /* !!! Public in this patch to allow old usage */ !strncasecmp(query, "SAVEPOINT", 9) || !strncasecmp(query, "ROLLBACK", 8); } - bool is_begin() { return !strcmp(query, "BEGIN"); } - bool is_commit() { return !strcmp(query, "COMMIT"); } - bool is_rollback() { return !strcmp(query, "ROLLBACK"); } + virtual bool is_begin() { return !strcmp(query, "BEGIN"); } + virtual bool is_commit() { return !strcmp(query, "COMMIT"); } + virtual bool is_rollback() { return !strcmp(query, "ROLLBACK"); } +}; + +class Query_compressed_log_event:public Query_log_event{ +protected: + Log_event::Byte* query_buf; // point to the uncompressed query +public: + Query_compressed_log_event(const char* buf, uint event_len, + const Format_description_log_event *description_event, + Log_event_type event_type); + ~Query_compressed_log_event() + { + if (query_buf) + my_free(query_buf); + } + Log_event_type get_type_code() { return QUERY_COMPRESSED_EVENT; } + + /* + the min length of log_bin_compress_min_len is 10, + means that Begin/Commit/Rollback would never be compressed! + */ + virtual bool is_begin() { return false; } + virtual bool is_commit() { return false; } + virtual bool is_rollback() { return false; } +#ifdef MYSQL_SERVER + Query_compressed_log_event(THD* thd_arg, const char* query_arg, ulong query_length, + bool using_trans, bool direct, bool suppress_use, int error); + virtual bool write(); +#endif }; @@ -4341,6 +4391,7 @@ public: #ifdef MYSQL_SERVER virtual bool write_data_header(); virtual bool write_data_body(); + virtual bool write_compressed(); virtual const char *get_db() { return m_table->s->db.str; } #endif /* @@ -4375,6 +4426,7 @@ protected: #endif Rows_log_event(const char *row_data, uint event_len, const Format_description_log_event *description_event); + void uncompress_buf(); #ifdef MYSQL_CLIENT void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); @@ -4587,6 +4639,23 @@ private: #endif }; +class Write_rows_compressed_log_event : public Write_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id, + bool is_transactional); + virtual bool write(); +#endif +#ifdef HAVE_REPLICATION + Write_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#if defined(MYSQL_CLIENT) + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; /** @class Update_rows_log_event @@ -4657,6 +4726,24 @@ protected: #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */ }; +class Update_rows_compressed_log_event : public Update_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id, + bool is_transactional); + virtual bool write(); +#endif +#ifdef HAVE_REPLICATION + Update_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#if defined(MYSQL_CLIENT) + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + /** @class Delete_rows_log_event @@ -4723,6 +4810,23 @@ protected: #endif }; +class Delete_rows_compressed_log_event : public Delete_rows_log_event +{ +public: +#if defined(MYSQL_SERVER) + Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional); + virtual bool write(); +#endif +#ifdef HAVE_REPLICATION + Delete_rows_compressed_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event); +#endif +private: +#if defined(MYSQL_CLIENT) + void print(FILE *file, PRINT_EVENT_INFO *print_event_info); +#endif +}; + #include "log_event_old.h" @@ -4964,4 +5068,17 @@ extern TYPELIB binlog_checksum_typelib; @} (end of group Replication) */ + +int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen); +int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen); +uint32 binlog_get_compress_len(uint32 len); +uint32 binlog_get_uncompress_len(const char *buf); + +int query_event_uncompress(const Format_description_log_event *description_event, + bool contain_checksum, const char *src, char **dst, ulong *newlen); + +int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, + const char *src, char **dst, ulong *newlen); + + #endif /* _log_event_h */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 310ccb047c4..8881e0423f5 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -389,6 +389,8 @@ static DYNAMIC_ARRAY all_options; /* Global variables */ bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0; +bool opt_bin_log_compress; +uint opt_bin_log_compress_min_len; my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0; my_bool debug_assert_on_not_freed_memory= 0; my_bool disable_log_notes; diff --git a/sql/mysqld.h b/sql/mysqld.h index 294b463161b..02bbdf839c1 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -109,7 +109,8 @@ extern CHARSET_INFO *character_set_filesystem; extern MY_BITMAP temp_pool; extern bool opt_large_files; -extern bool opt_update_log, opt_bin_log, opt_error_log; +extern bool opt_update_log, opt_bin_log, opt_error_log, opt_bin_log_compress; +extern uint opt_bin_log_compress_min_len; extern my_bool opt_log, opt_bootstrap; extern my_bool opt_backup_history_log; extern my_bool opt_backup_progress_log; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 9e2977c8bc5..f687f645171 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1732,6 +1732,12 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, case DELETE_ROWS_EVENT: case UPDATE_ROWS_EVENT: case WRITE_ROWS_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: /* After the last Rows event has been applied, the saved Annotate_rows event (if any) is not needed anymore and can be deleted. diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index d42611bfebe..4d1925e5d84 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -7232,3 +7232,5 @@ ER_PARTITION_DEFAULT_ERROR ukr "Припустимо мати тільки один DEFAULT розділ" ER_REFERENCED_TRG_DOES_NOT_EXIST eng "Referenced trigger '%s' for the given action time and event type does not exist" +ER_BINLOG_UNCOMPRESS_ERROR + eng "Uncompress the compressed binlog failed" diff --git a/sql/slave.cc b/sql/slave.cc index 20bf68e6b6f..f7efb97e44d 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3769,7 +3769,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) } /* Check for an event that starts or stops a transaction */ - if (typ == QUERY_EVENT) + if (LOG_EVENT_IS_QUERY(typ)) { Query_log_event *qev= (Query_log_event*) ev; /* @@ -3909,7 +3909,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, */ DBUG_EXECUTE_IF("incomplete_group_in_relay_log", if ((typ == XID_EVENT) || - ((typ == QUERY_EVENT) && + (LOG_EVENT_IS_QUERY(typ) && strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0)) { DBUG_ASSERT(thd->transaction.all.modified_non_trans_table); @@ -5664,6 +5664,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) bool gtid_skip_enqueue= false; bool got_gtid_event= false; rpl_gtid event_gtid; + bool compressed_event = FALSE; /* FD_q must have been prepared for the first R_a event @@ -5710,7 +5711,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) // Emulate the network corruption DBUG_EXECUTE_IF("corrupt_queue_event", - if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT) { char *debug_event_buf_c = (char*) buf; int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN); @@ -6144,6 +6145,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) inc_pos= event_len; } break; + /* + Binlog compressed event should uncompress in IO thread + */ + case QUERY_COMPRESSED_EVENT: + inc_pos= event_len; + if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + compressed_event = true; + goto default_action; + + case WRITE_ROWS_COMPRESSED_EVENT: + case UPDATE_ROWS_COMPRESSED_EVENT: + case DELETE_ROWS_COMPRESSED_EVENT: + case WRITE_ROWS_COMPRESSED_EVENT_V1: + case UPDATE_ROWS_COMPRESSED_EVENT_V1: + case DELETE_ROWS_COMPRESSED_EVENT_V1: + inc_pos = event_len; + { + if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len)) + { + char llbuf[22]; + error = ER_BINLOG_UNCOMPRESS_ERROR; + error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: ")); + llstr(mi->master_log_pos, llbuf); + error_msg.append(llbuf, strlen(llbuf)); + goto err; + } + } + compressed_event = true; + goto default_action; #ifndef DBUG_OFF case XID_EVENT: @@ -6162,7 +6200,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_after_2_events", { if (mi->dbug_do_disconnect && - (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) || + (LOG_EVENT_IS_QUERY((uchar)buf[EVENT_TYPE_OFFSET]) || ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT)) && (--mi->dbug_event_counter == 0)) { @@ -6175,7 +6213,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) DBUG_EXECUTE_IF("kill_slave_io_before_commit", { if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg))) { @@ -6195,7 +6233,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ++mi->events_queued_since_last_gtid; } - inc_pos= event_len; + if (!compressed_event) + inc_pos= event_len; + break; } @@ -6286,8 +6326,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) /* everything is filtered out from non-master */ (s_id != mi->master_id || /* for the master meta information is necessary */ - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) || /* Check whether it needs to be filtered based on domain_id @@ -6316,9 +6356,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) */ if (!(s_id == global_system_variables.server_id && !mi->rli.replicate_same_server_id) || - (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) + ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); @@ -6359,7 +6399,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) buf[EVENT_TYPE_OFFSET])) || (!mi->last_queued_gtid_standalone && ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || - ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg)))))) { @@ -6389,6 +6429,9 @@ err: mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), error_msg.ptr()); + if(compressed_event) + my_free((void *)buf); + DBUG_RETURN(error); } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 1af3b9a9cca..a741a0f6d01 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -6373,7 +6373,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - Rows_log_event* const ev= + Rows_log_event* ev; + if (binlog_should_compress(len)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Write_rows_compressed_log_event*>(0)); + else + ev = binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Write_rows_log_event*>(0)); @@ -6421,8 +6428,15 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, DBUG_DUMP("after_row", after_row, after_size); #endif - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, + Rows_log_event* ev; + if(binlog_should_compress(before_size + after_size)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + before_size + after_size, is_trans, + static_cast<Update_rows_compressed_log_event*>(0)); + else + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, before_size + after_size, is_trans, static_cast<Update_rows_log_event*>(0)); @@ -6474,8 +6488,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, if (variables.option_bits & OPTION_GTID_BEGIN) is_trans= 1; - Rows_log_event* const ev= - binlog_prepare_pending_rows_event(table, variables.server_id, + Rows_log_event* ev; + if(binlog_should_compress(len)) + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, + len, is_trans, + static_cast<Delete_rows_compressed_log_event*>(0)); + else + ev = + binlog_prepare_pending_rows_event(table, variables.server_id, len, is_trans, static_cast<Delete_rows_log_event*>(0)); @@ -6940,15 +6961,28 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, flush the pending rows event if necessary. */ { - Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, - suppress_use, errcode); + Log_event* ev = NULL; + int error = 0; + /* Binlog table maps will be irrelevant after a Query_log_event (they are just removed on the slave side) so after the query log event is written to the binary log, we pretend that no table maps were written. - */ - int error= mysql_bin_log.write(&qinfo); + */ + if(binlog_should_compress(query_len)) + { + Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error= mysql_bin_log.write(&qinfo); + } + else + { + Query_log_event qinfo(this, query_arg, query_len, is_trans, direct, + suppress_use, errcode); + error= mysql_bin_log.write(&qinfo); + } + binlog_table_maps= 0; DBUG_RETURN(error); } diff --git a/sql/sql_class.h b/sql/sql_class.h index 994a161a646..a63a3379ec1 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5681,6 +5681,12 @@ void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage, #define THD_EXIT_COND(P1, P2) \ thd_exit_cond(P1, P2, __func__, __FILE__, __LINE__) +inline bool binlog_should_compress(ulong len) +{ + return opt_bin_log_compress && + len >= opt_bin_log_compress_min_len; +} + #endif /* MYSQL_SERVER */ #endif /* SQL_CLASS_INCLUDED */ diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index bdf90f7caf6..c115ac5f0ec 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1629,7 +1629,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset, break; case GTID_UNTIL_STOP_AFTER_TRANSACTION: if (event_type != XID_EVENT && - (event_type != QUERY_EVENT || + (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ !Query_log_event::peek_is_commit_rollback (info->packet->ptr()+*ev_offset, info->packet->length()-*ev_offset, @@ -1863,7 +1863,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, return NULL; case GTID_SKIP_TRANSACTION: if (event_type == XID_EVENT || - (event_type == QUERY_EVENT && + (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, len - ev_offset, current_checksum_alg))) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 3e6a06c3ab0..ab4429005ab 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1157,6 +1157,18 @@ static Sys_var_mybool Sys_log_bin( "log_bin", "Whether the binary log is enabled", READ_ONLY GLOBAL_VAR(opt_bin_log), NO_CMD_LINE, DEFAULT(FALSE)); +static Sys_var_mybool Sys_log_bin_compress( + "log_bin_compress", "Whether the binary log can be compressed", + GLOBAL_VAR(opt_bin_log_compress), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + +/* the min length is 10, means that Begin/Commit/Rollback would never be compressed! */ +static Sys_var_uint Sys_log_bin_compress_min_len( + "log_bin_compress_min_len", + "Minimum length of sql statement(in statement mode) or record(in row mode)" + "that can be compressed.", + GLOBAL_VAR(opt_bin_log_compress_min_len), + CMD_LINE(OPT_ARG), VALID_RANGE(10, 1024), DEFAULT(256), BLOCK_SIZE(1)); + static Sys_var_mybool Sys_trust_function_creators( "log_bin_trust_function_creators", "If set to FALSE (the default), then when --log-bin is used, creation " |