summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/mysqlbinlog.cc7
-rw-r--r--sql/log.cc2
-rw-r--r--sql/log_event.cc550
-rw-r--r--sql/log_event.h123
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/mysqld.h3
-rw-r--r--sql/rpl_rli.cc6
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc67
-rw-r--r--sql/sql_class.cc52
-rw-r--r--sql/sql_class.h6
-rw-r--r--sql/sql_repl.cc4
-rw-r--r--sql/sys_vars.cc12
13 files changed, 797 insertions, 39 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index 6a52c9fe29a..94443791441 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -1002,6 +1002,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
switch (ev_type) {
case QUERY_EVENT:
+ case QUERY_COMPRESSED_EVENT:
{
Query_log_event *qe= (Query_log_event*)ev;
if (!qe->is_trans_keyword())
@@ -1243,6 +1244,12 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
case WRITE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V1:
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
{
Rows_log_event *e= (Rows_log_event*) ev;
if (print_row_event(print_event_info, ev, e->get_table_id(),
diff --git a/sql/log.cc b/sql/log.cc
index 2c290715741..4d903154d98 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -9838,7 +9838,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
((last_gtid_standalone && !ev->is_part_of_group(typ)) ||
(!last_gtid_standalone &&
(typ == XID_EVENT ||
- (typ == QUERY_EVENT &&
+ (LOG_EVENT_IS_QUERY(typ) &&
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback()))))))
{
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 01bb5e7a561..fdf8007283d 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -51,6 +51,7 @@
#include "rpl_utility.h"
#include "rpl_constants.h"
#include "sql_digest.h"
+#include "zlib.h"
#define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
@@ -702,6 +703,271 @@ char *str_to_hex(char *to, const char *from, uint len)
return to; // pointer to end 0 of 'to'
}
+#define BINLOG_COMPRESSED_HEADER_LEN 1
+#define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4
+/**
+ Compressed Record
+ Record Header: 1 Byte
+ 0 Bit: Always 1, mean compressed;
+ 1-3 Bit: Reversed, compressed algorithmAlways 0, means zlib
+ 4-7 Bit: Bytes of "Record Original Length"
+ Record Original Length: 1-4 Bytes
+ Compressed Buf:
+*/
+
+/**
+ Get the length of compress content.
+*/
+
+uint32 binlog_get_compress_len(uint32 len)
+{
+ /* 5 for the begin content, 1 reserved for a '\0'*/
+ return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES)
+ + compressBound(len) + 1);
+}
+
+/**
+ Compress buf from 'src' to 'dst'.
+
+ Note: 1) Then the caller should guarantee the length of 'dst', which
+ can be got by binlog_get_uncompress_len, is enough to hold
+ the content uncompressed.
+ 2) The 'comlen' should stored the length of 'dst', and it will
+ be set as the size of compressed content after return.
+
+ return zero if successful, others otherwise.
+*/
+int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
+{
+ uchar lenlen;
+ if (len & 0xFF000000)
+ {
+ dst[1] = uchar(len >> 24);
+ dst[2] = uchar(len >> 16);
+ dst[3] = uchar(len >> 8);
+ dst[4] = uchar(len);
+ lenlen = 4;
+ }
+ else if (len & 0x00FF0000)
+ {
+ dst[1] = uchar(len >> 16);
+ dst[2] = uchar(len >> 8);
+ dst[3] = uchar(len);
+ lenlen = 3;
+ }
+ else if (len & 0x0000FF00)
+ {
+ dst[1] = uchar(len >> 8);
+ dst[2] = uchar(len);
+ lenlen = 2;
+ }
+ else
+ {
+ dst[1] = uchar(len);
+ lenlen = 1;
+ }
+ dst[0] = 0x80 | (lenlen & 0x07);
+
+ uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1;
+ if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen, (const Bytef *)src, (uLongf)len) != Z_OK)
+ {
+ return 1;
+ }
+ *comlen = (uint32)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen;
+ return 0;
+}
+
+/**
+ Convert a query_compressed_log_event to query_log_event
+ from 'src' to 'dst'(malloced inside), the size after compress
+ stored in 'newlen'.
+
+ @Warning:
+ 1)The caller should call my_free to release 'dst'.
+
+ return zero if successful, others otherwise.
+*/
+
+int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char **dst, ulong *newlen)
+{
+ ulong len = uint4korr(src + EVENT_LEN_OFFSET);
+ const char *tmp = src;
+
+ DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT);
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[QUERY_COMPRESSED_EVENT-1];
+
+ tmp += common_header_len;
+
+ uint db_len = (uint)tmp[Q_DB_LEN_OFFSET];
+ uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET);
+
+ tmp += post_header_len + status_vars_len + db_len + 1;
+
+ uint32 un_len = binlog_get_uncompress_len(tmp);
+ *newlen = (tmp - src) + un_len;
+ if(contain_checksum)
+ *newlen += BINLOG_CHECKSUM_LEN;
+
+ *dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE));
+ if (!*dst)
+ {
+ return 1;
+ }
+
+ /* copy the head*/
+ memcpy(*dst, src , tmp - src);
+ if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len))
+ {
+ my_free(*dst);
+ return 1;
+ }
+
+ (*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT;
+ int4store(*dst + EVENT_LEN_OFFSET, *newlen);
+ if(contain_checksum){
+ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
+ int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len));
+ }
+ return 0;
+}
+
+int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char **dst, ulong *newlen)
+{
+ Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
+ ulong len = uint4korr(src + EVENT_LEN_OFFSET);
+ const char *tmp = src;
+ char *buf = NULL;
+
+ DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[type-1];
+
+ tmp += common_header_len + ROWS_HEADER_LEN_V1;
+ if (post_header_len == ROWS_HEADER_LEN_V2)
+ {
+ /*
+ Have variable length header, check length,
+ which includes length bytes
+ */
+ uint16 var_header_len= uint2korr(tmp);
+ assert(var_header_len >= 2);
+
+ /* skip over var-len header, extracting 'chunks' */
+ tmp += var_header_len;
+
+ /* get the uncompressed event type */
+ type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT);
+ }
+ else
+ {
+ /* get the uncompressed event type */
+ type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1);
+ }
+
+
+ ulong m_width = net_field_length((uchar **)&tmp);
+ tmp += (m_width + 7) / 8;
+
+ if (type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_EVENT)
+ {
+ tmp += (m_width + 7) / 8;
+ }
+
+ uint32 un_len = binlog_get_uncompress_len(tmp);
+ *newlen = (tmp - src) + un_len;
+ if(contain_checksum)
+ *newlen += BINLOG_CHECKSUM_LEN;
+
+ uint32 alloc_size = ALIGN_SIZE(*newlen);
+ buf = (char *)my_malloc(alloc_size , MYF(MY_FAE));
+ if (!buf)
+ {
+ return 1;
+ }
+
+ /* copy the head*/
+ memcpy(buf, src , tmp - src);
+ /* uncompress the body */
+ if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len))
+ {
+ my_free(buf);
+ return 1;
+ }
+
+ buf[EVENT_TYPE_OFFSET] = type;
+ int4store(buf + EVENT_LEN_OFFSET, *newlen);
+ if(contain_checksum){
+ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
+ int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len));
+ }
+ *dst = buf;
+ return 0;
+}
+
+/**
+ Get the length of uncompress content.
+*/
+
+uint32 binlog_get_uncompress_len(const char *buf)
+{
+ DBUG_ASSERT((buf[0] & 0xe0) == 0x80);
+ uint32 lenlen = buf[0] & 0x07;
+ uint32 len = 0;
+ switch(lenlen)
+ {
+ case 1:
+ len = uchar(buf[1]);
+ break;
+ case 2:
+ len = uchar(buf[1]) << 8 | uchar(buf[2]);
+ break;
+ case 3:
+ len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]);
+ break;
+ case 4:
+ len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 | uchar(buf[3]) << 8 | uchar(buf[4]);
+ break;
+ default:
+ DBUG_ASSERT(lenlen >= 1 && lenlen <= 4);
+ break;
+ }
+ return len;
+}
+
+/**
+ Uncompress the content in 'src' with length of 'len' to 'dst'.
+
+ Note: 1) Then the caller should guarantee the length of 'dst' (which
+ can be got by statement_get_uncompress_len) is enough to hold
+ the content uncompressed.
+ 2) The 'newlen' should stored the length of 'dst', and it will
+ be set as the size of uncompressed content after return.
+
+ return zero if successful, others otherwise.
+*/
+int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen)
+{
+ if((src[0] & 0x80) == 0)
+ {
+ return 1;
+ }
+
+ uint32 lenlen = src[0] & 0x07;
+ uLongf buflen = *newlen;
+ if(uncompress((Bytef *)dst, &buflen, (const Bytef*)src + 1 + lenlen, len) != Z_OK)
+ {
+ return 1;
+ }
+
+ *newlen = (uint32)buflen;
+ return 0;
+}
+
#ifndef MYSQL_CLIENT
/**
@@ -828,6 +1094,13 @@ const char* Log_event::get_type_str(Log_event_type type)
case TRANSACTION_CONTEXT_EVENT: return "Transaction_context";
case VIEW_CHANGE_EVENT: return "View_change";
case XA_PREPARE_LOG_EVENT: return "XA_prepare";
+ case QUERY_COMPRESSED_EVENT: return "Query_compressed";
+ case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed";
+ case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed";
+ case DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed";
+ case WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1";
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1";
+ case DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1";
default: return "Unknown"; /* impossible */
}
@@ -1661,6 +1934,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT);
break;
+ case QUERY_COMPRESSED_EVENT:
+ ev = new Query_compressed_log_event(buf, event_len, fdle, QUERY_COMPRESSED_EVENT);
+ break;
case LOAD_EVENT:
ev = new Load_log_event(buf, event_len, fdle);
break;
@@ -1735,6 +2011,19 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
ev = new Delete_rows_log_event(buf, event_len, fdle);
break;
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Write_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Update_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Delete_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+
/* MySQL GTID events are ignored */
case GTID_LOG_EVENT:
case ANONYMOUS_GTID_LOG_EVENT:
@@ -1778,7 +2067,7 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
else
{
DBUG_PRINT("error",("Unknown event code: %d",
- (int) buf[EVENT_TYPE_OFFSET]));
+ (uchar) buf[EVENT_TYPE_OFFSET]));
ev= NULL;
break;
}
@@ -2831,11 +3120,32 @@ void Log_event::print_base64(IO_CACHE* file,
}
case UPDATE_ROWS_EVENT:
case UPDATE_ROWS_EVENT_V1:
- {
- ev= new Update_rows_log_event((const char*) ptr, size,
- glob_description_event);
- break;
- }
+ {
+ ev= new Update_rows_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ {
+ ev= new Write_rows_compressed_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ {
+ ev= new Update_rows_compressed_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ {
+ ev= new Delete_rows_compressed_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
default:
break;
}
@@ -3190,6 +3500,23 @@ bool Query_log_event::write()
write_footer();
}
+bool Query_compressed_log_event::write()
+{
+ const char *query_tmp = query;
+ uint32 q_len_tmp = q_len;
+ bool ret = true;
+ q_len = binlog_get_compress_len(q_len);
+ query = (char *)my_malloc(q_len, MYF(MY_FAE));
+ if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len))
+ {
+ ret = Query_log_event::write();
+ }
+ my_free((void *)query);
+ query = query_tmp;
+ q_len = q_len_tmp;
+ return ret;
+}
+
/**
The simplest constructor that could possibly work. This is used for
creating static objects that have a special meaning and are invisible
@@ -3382,6 +3709,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu cache_tye: %d",
(ulong) flags2, sql_mode, cache_type));
}
+
+Query_compressed_log_event::Query_compressed_log_event(THD* thd_arg, const char* query_arg,
+ ulong query_length, bool using_trans,
+ bool direct, bool suppress_use, int errcode)
+ :Query_log_event(thd_arg, query_arg, query_length, using_trans, direct, suppress_use, errcode),
+ query_buf(0)
+{
+
+}
#endif /* MYSQL_CLIENT */
@@ -3788,6 +4124,28 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
DBUG_VOID_RETURN;
}
+Query_compressed_log_event::Query_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event,
+ Log_event_type event_type)
+ :Query_log_event(buf, event_len, description_event, event_type),query_buf(NULL)
+{
+ if(query)
+ {
+ uint32 un_len=binlog_get_uncompress_len(query);
+ query_buf = (Log_event::Byte*)my_malloc(ALIGN_SIZE(un_len + 1), MYF(MY_WME)); //reserve one byte for '\0'
+ if(query_buf && !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len))
+ {
+ query_buf[un_len] = 0;
+ query = (const char *)query_buf;
+ q_len = un_len;
+ }
+ else
+ {
+ query= 0;
+ }
+ }
+}
/*
Replace a binlog event read into a packet with a dummy event. Either a
@@ -5056,6 +5414,15 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN;
+ //compressed event
+ post_header_len[QUERY_COMPRESSED_EVENT-1]= QUERY_HEADER_LEN;
+ post_header_len[WRITE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[UPDATE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[DELETE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+
// Sanity-check that all post header lengths are initialized.
int i;
for (i=0; i<number_of_event_types; i++)
@@ -9404,7 +9771,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
{
DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
uint8 const common_header_len= description_event->common_header_len;
- Log_event_type event_type= (Log_event_type) buf[EVENT_TYPE_OFFSET];
+ Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
m_type= event_type;
uint8 const post_header_len= description_event->post_header_len[event_type-1];
@@ -9503,8 +9870,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
- if ((event_type == UPDATE_ROWS_EVENT) ||
- (event_type == UPDATE_ROWS_EVENT_V1))
+ if (LOG_EVENT_IS_UPDATE_ROW(event_type))
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
@@ -9551,6 +9917,31 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
DBUG_VOID_RETURN;
}
+void Rows_log_event::uncompress_buf()
+{
+ uint32 un_len = binlog_get_uncompress_len((char *)m_rows_buf);
+ uchar *new_buf= (uchar*) my_malloc(ALIGN_SIZE(un_len), MYF(MY_WME));
+ if (new_buf)
+ {
+ if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf, m_rows_cur - m_rows_buf, &un_len))
+ {
+ my_free(m_rows_buf);
+ m_rows_buf = new_buf;
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ m_curr_row= m_rows_buf;
+#endif
+ m_rows_end= m_rows_buf + un_len;
+ m_rows_cur= m_rows_end;
+ return;
+ }
+ else
+ {
+ my_free(new_buf);
+ }
+ }
+ m_cols.bitmap= 0; // catch it in is_valid
+}
+
Rows_log_event::~Rows_log_event()
{
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
@@ -9573,7 +9964,8 @@ int Rows_log_event::get_data_size()
(m_rows_cur - m_rows_buf););
int data_size= 0;
- bool is_v2_event= get_type_code() > DELETE_ROWS_EVENT_V1;
+ Log_event_type type = get_type_code();
+ bool is_v2_event= LOG_EVENT_IS_ROW_V2(type);
if (is_v2_event)
{
data_size= ROWS_HEADER_LEN_V2 +
@@ -10388,6 +10780,24 @@ bool Rows_log_event::write_data_body()
return res;
}
+
+bool Rows_log_event::write_compressed()
+{
+ uchar *m_rows_buf_tmp = m_rows_buf;
+ uchar *m_rows_cur_tmp = m_rows_cur;
+ bool ret = true;
+ uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
+ m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE));
+ if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
+ {
+ m_rows_cur = comlen + m_rows_buf;
+ ret = Log_event::write();
+ }
+ my_free(m_rows_buf);
+ m_rows_buf = m_rows_buf_tmp;
+ m_rows_cur = m_rows_cur_tmp;
+ return ret;
+}
#endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
@@ -11302,6 +11712,20 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
is_transactional, WRITE_ROWS_EVENT_V1)
{
}
+
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid_arg,
+ bool is_transactional)
+ : Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
+{
+ //m_type = log_bin_use_v1_row_events ? WRITE_ROWS_COMPRESSED_EVENT_V1 : WRITE_ROWS_COMPRESSED_EVENT;
+ m_type = WRITE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+bool Write_rows_compressed_log_event::write()
+{
+ return Rows_log_event::write_compressed();
+}
#endif
/*
@@ -11314,6 +11738,14 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, description_event)
{
}
+
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+: Write_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -11805,6 +12237,23 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
{DBUG_SET("+d,simulate_my_b_fill_error");});
Rows_log_event::print_helper(file, print_event_info, "Write_rows");
}
+
+void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+{
+ char *new_buf;
+ ulong len;
+ if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
+ temp_buf, &new_buf, &len))
+ {
+ free_temp_buf();
+ register_temp_buf(new_buf, true);
+ Rows_log_event::print_helper(file, print_event_info, "Write_compressed_rows");
+ }
+ else
+ {
+ my_b_printf(&print_event_info->head_cache, "ERROR: uncompress write_compressed_rows failed\n");
+ }
+}
#endif
@@ -12325,6 +12774,20 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
DELETE_ROWS_EVENT_V1)
{
}
+
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid_arg,
+ bool is_transactional)
+ : Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
+{
+ //m_type = log_bin_use_v1_row_events ? DELETE_ROWS_COMPRESSED_EVENT_V1 : DELETE_ROWS_COMPRESSED_EVENT;
+ m_type = DELETE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+bool Delete_rows_compressed_log_event::write()
+{
+ return Rows_log_event::write_compressed();
+}
#endif /* #if !defined(MYSQL_CLIENT) */
/*
@@ -12337,6 +12800,14 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, description_event)
{
}
+
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Delete_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -12434,6 +12905,24 @@ void Delete_rows_log_event::print(FILE *file,
{
Rows_log_event::print_helper(file, print_event_info, "Delete_rows");
}
+
+void Delete_rows_compressed_log_event::print(FILE *file,
+ PRINT_EVENT_INFO* print_event_info)
+{
+ char *new_buf;
+ ulong len;
+ if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
+ temp_buf, &new_buf, &len))
+ {
+ free_temp_buf();
+ register_temp_buf(new_buf, true);
+ Rows_log_event::print_helper(file, print_event_info, "Delete_compressed_rows");
+ }
+ else
+ {
+ my_b_printf(&print_event_info->head_cache, "ERROR: uncompress delete_compressed_rows failed\n");
+ }
+}
#endif
@@ -12461,6 +12950,20 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
init(tbl_arg->rpl_write_set);
}
+Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid,
+ bool is_transactional)
+: Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional)
+{
+ //m_type = log_bin_use_v1_row_events ? UPDATE_ROWS_COMPRESSED_EVENT_V1 : UPDATE_ROWS_COMPRESSED_EVENT;
+ m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+bool Update_rows_compressed_log_event::write()
+{
+ return Rows_log_event::write_compressed();
+}
+
void Update_rows_log_event::init(MY_BITMAP const *cols)
{
/* if my_bitmap_init fails, caught in is_valid() */
@@ -12499,6 +13002,14 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, description_event)
{
}
+
+Update_rows_compressed_log_event::Update_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Update_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -12651,6 +13162,23 @@ void Update_rows_log_event::print(FILE *file,
{
Rows_log_event::print_helper(file, print_event_info, "Update_rows");
}
+
+void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ char *new_buf;
+ ulong len;
+ if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
+ temp_buf, &new_buf, &len))
+ {
+ free_temp_buf();
+ register_temp_buf(new_buf, true);
+ Rows_log_event::print_helper(file, print_event_info, "Update_compressed_rows");
+ }
+ else
+ {
+ my_b_printf(&print_event_info->head_cache, "ERROR: uncompress update_compressed_rows failed\n");
+ }
+}
#endif
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
@@ -12778,7 +13306,7 @@ err:
DBUG_ASSERT(error != 0);
sql_print_error("Error in Log_event::read_log_event(): "
"'%s', data_len: %d, event_type: %d",
- error,data_len,head[EVENT_TYPE_OFFSET]);
+ error,data_len,(uchar)head[EVENT_TYPE_OFFSET]);
}
(*arg_buf)+= data_len;
(*arg_buf_len)-= data_len;
diff --git a/sql/log_event.h b/sql/log_event.h
index 306f78ca4c9..14bfc54aed3 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -692,11 +692,33 @@ enum Log_event_type
START_ENCRYPTION_EVENT= 164,
+ /*
+ Compressed binlog event.
+ */
+ QUERY_COMPRESSED_EVENT = 165,
+ WRITE_ROWS_COMPRESSED_EVENT_V1 = 166,
+ UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167,
+ DELETE_ROWS_COMPRESSED_EVENT_V1 = 168,
+ WRITE_ROWS_COMPRESSED_EVENT = 169,
+ UPDATE_ROWS_COMPRESSED_EVENT = 170,
+ DELETE_ROWS_COMPRESSED_EVENT = 171,
+
/* Add new MariaDB events here - right above this comment! */
ENUM_END_EVENT /* end marker */
};
+#define LOG_EVENT_IS_QUERY(type) (type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT)
+#define LOG_EVENT_IS_WRITE_ROW(type) (type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 || type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_UPDATE_ROW(type) (type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_DELETE_ROW(type) (type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 || type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_ROW_COMPRESSED(type) (type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||\
+ type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||\
+ type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_ROW_V2(type) (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT || \
+ type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT )
+
+
/*
The number of types we handle in Format_description_log_event (UNKNOWN_EVENT
is not to be handled, it does not exist in binlogs, it does not have a
@@ -2045,9 +2067,37 @@ public: /* !!! Public in this patch to allow old usage */
!strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8);
}
- bool is_begin() { return !strcmp(query, "BEGIN"); }
- bool is_commit() { return !strcmp(query, "COMMIT"); }
- bool is_rollback() { return !strcmp(query, "ROLLBACK"); }
+ virtual bool is_begin() { return !strcmp(query, "BEGIN"); }
+ virtual bool is_commit() { return !strcmp(query, "COMMIT"); }
+ virtual bool is_rollback() { return !strcmp(query, "ROLLBACK"); }
+};
+
+class Query_compressed_log_event:public Query_log_event{
+protected:
+ Log_event::Byte* query_buf; // point to the uncompressed query
+public:
+ Query_compressed_log_event(const char* buf, uint event_len,
+ const Format_description_log_event *description_event,
+ Log_event_type event_type);
+ ~Query_compressed_log_event()
+ {
+ if (query_buf)
+ my_free(query_buf);
+ }
+ Log_event_type get_type_code() { return QUERY_COMPRESSED_EVENT; }
+
+ /*
+ the min length of log_bin_compress_min_len is 10,
+ means that Begin/Commit/Rollback would never be compressed!
+ */
+ virtual bool is_begin() { return false; }
+ virtual bool is_commit() { return false; }
+ virtual bool is_rollback() { return false; }
+#ifdef MYSQL_SERVER
+ Query_compressed_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
+ bool using_trans, bool direct, bool suppress_use, int error);
+ virtual bool write();
+#endif
};
@@ -4341,6 +4391,7 @@ public:
#ifdef MYSQL_SERVER
virtual bool write_data_header();
virtual bool write_data_body();
+ virtual bool write_compressed();
virtual const char *get_db() { return m_table->s->db.str; }
#endif
/*
@@ -4375,6 +4426,7 @@ protected:
#endif
Rows_log_event(const char *row_data, uint event_len,
const Format_description_log_event *description_event);
+ void uncompress_buf();
#ifdef MYSQL_CLIENT
void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name);
@@ -4587,6 +4639,23 @@ private:
#endif
};
+class Write_rows_compressed_log_event : public Write_rows_log_event
+{
+public:
+#if defined(MYSQL_SERVER)
+ Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id,
+ bool is_transactional);
+ virtual bool write();
+#endif
+#ifdef HAVE_REPLICATION
+ Write_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+#endif
+private:
+#if defined(MYSQL_CLIENT)
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+};
/**
@class Update_rows_log_event
@@ -4657,6 +4726,24 @@ protected:
#endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
};
+class Update_rows_compressed_log_event : public Update_rows_log_event
+{
+public:
+#if defined(MYSQL_SERVER)
+ Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id,
+ bool is_transactional);
+ virtual bool write();
+#endif
+#ifdef HAVE_REPLICATION
+ Update_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+#endif
+private:
+#if defined(MYSQL_CLIENT)
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+};
+
/**
@class Delete_rows_log_event
@@ -4723,6 +4810,23 @@ protected:
#endif
};
+class Delete_rows_compressed_log_event : public Delete_rows_log_event
+{
+public:
+#if defined(MYSQL_SERVER)
+ Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional);
+ virtual bool write();
+#endif
+#ifdef HAVE_REPLICATION
+ Delete_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+#endif
+private:
+#if defined(MYSQL_CLIENT)
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+};
+
#include "log_event_old.h"
@@ -4964,4 +5068,17 @@ extern TYPELIB binlog_checksum_typelib;
@} (end of group Replication)
*/
+
+int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen);
+int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen);
+uint32 binlog_get_compress_len(uint32 len);
+uint32 binlog_get_uncompress_len(const char *buf);
+
+int query_event_uncompress(const Format_description_log_event *description_event,
+ bool contain_checksum, const char *src, char **dst, ulong *newlen);
+
+int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char **dst, ulong *newlen);
+
+
#endif /* _log_event_h */
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 310ccb047c4..8881e0423f5 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -389,6 +389,8 @@ static DYNAMIC_ARRAY all_options;
/* Global variables */
bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0;
+bool opt_bin_log_compress;
+uint opt_bin_log_compress_min_len;
my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0;
my_bool debug_assert_on_not_freed_memory= 0;
my_bool disable_log_notes;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 294b463161b..02bbdf839c1 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -109,7 +109,8 @@ extern CHARSET_INFO *character_set_filesystem;
extern MY_BITMAP temp_pool;
extern bool opt_large_files;
-extern bool opt_update_log, opt_bin_log, opt_error_log;
+extern bool opt_update_log, opt_bin_log, opt_error_log, opt_bin_log_compress;
+extern uint opt_bin_log_compress_min_len;
extern my_bool opt_log, opt_bootstrap;
extern my_bool opt_backup_history_log;
extern my_bool opt_backup_progress_log;
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 9e2977c8bc5..f687f645171 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1732,6 +1732,12 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case WRITE_ROWS_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
/*
After the last Rows event has been applied, the saved Annotate_rows
event (if any) is not needed anymore and can be deleted.
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index d42611bfebe..4d1925e5d84 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -7232,3 +7232,5 @@ ER_PARTITION_DEFAULT_ERROR
ukr "Припустимо мати тільки один DEFAULT розділ"
ER_REFERENCED_TRG_DOES_NOT_EXIST
eng "Referenced trigger '%s' for the given action time and event type does not exist"
+ER_BINLOG_UNCOMPRESS_ERROR
+ eng "Uncompress the compressed binlog failed"
diff --git a/sql/slave.cc b/sql/slave.cc
index 20bf68e6b6f..f7efb97e44d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3769,7 +3769,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
}
/* Check for an event that starts or stops a transaction */
- if (typ == QUERY_EVENT)
+ if (LOG_EVENT_IS_QUERY(typ))
{
Query_log_event *qev= (Query_log_event*) ev;
/*
@@ -3909,7 +3909,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
if ((typ == XID_EVENT) ||
- ((typ == QUERY_EVENT) &&
+ (LOG_EVENT_IS_QUERY(typ) &&
strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
{
DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
@@ -5664,6 +5664,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
+ bool compressed_event = FALSE;
/*
FD_q must have been prepared for the first R_a event
@@ -5710,7 +5711,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
// Emulate the network corruption
DBUG_EXECUTE_IF("corrupt_queue_event",
- if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
{
char *debug_event_buf_c = (char*) buf;
int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
@@ -6144,6 +6145,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
inc_pos= event_len;
}
break;
+ /*
+ Binlog compressed event should uncompress in IO thread
+ */
+ case QUERY_COMPRESSED_EVENT:
+ inc_pos= event_len;
+ if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ compressed_event = true;
+ goto default_action;
+
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ inc_pos = event_len;
+ {
+ if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ }
+ compressed_event = true;
+ goto default_action;
#ifndef DBUG_OFF
case XID_EVENT:
@@ -6162,7 +6200,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
{
if (mi->dbug_do_disconnect &&
- (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) ||
+ (LOG_EVENT_IS_QUERY((uchar)buf[EVENT_TYPE_OFFSET]) ||
((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
&& (--mi->dbug_event_counter == 0))
{
@@ -6175,7 +6213,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_before_commit",
{
if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg)))
{
@@ -6195,7 +6233,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid;
}
- inc_pos= event_len;
+ if (!compressed_event)
+ inc_pos= event_len;
+
break;
}
@@ -6286,8 +6326,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
/* everything is filtered out from non-master */
(s_id != mi->master_id ||
/* for the master meta information is necessary */
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
/*
Check whether it needs to be filtered based on domain_id
@@ -6316,9 +6356,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
if (!(s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -6359,7 +6399,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
buf[EVENT_TYPE_OFFSET])) ||
(!mi->last_queued_gtid_standalone &&
((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg))))))
{
@@ -6389,6 +6429,9 @@ err:
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
+ if(compressed_event)
+ my_free((void *)buf);
+
DBUG_RETURN(error);
}
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 1af3b9a9cca..a741a0f6d01 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -6373,7 +6373,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
- Rows_log_event* const ev=
+ Rows_log_event* ev;
+ if (binlog_should_compress(len))
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
+ len, is_trans,
+ static_cast<Write_rows_compressed_log_event*>(0));
+ else
+ ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Write_rows_log_event*>(0));
@@ -6421,8 +6428,15 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
DBUG_DUMP("after_row", after_row, after_size);
#endif
- Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, variables.server_id,
+ Rows_log_event* ev;
+ if(binlog_should_compress(before_size + after_size))
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
+ before_size + after_size, is_trans,
+ static_cast<Update_rows_compressed_log_event*>(0));
+ else
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0));
@@ -6474,8 +6488,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
- Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, variables.server_id,
+ Rows_log_event* ev;
+ if(binlog_should_compress(len))
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
+ len, is_trans,
+ static_cast<Delete_rows_compressed_log_event*>(0));
+ else
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Delete_rows_log_event*>(0));
@@ -6940,15 +6961,28 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
flush the pending rows event if necessary.
*/
{
- Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
- suppress_use, errcode);
+ Log_event* ev = NULL;
+ int error = 0;
+
/*
Binlog table maps will be irrelevant after a Query_log_event
(they are just removed on the slave side) so after the query
log event is written to the binary log, we pretend that no
table maps were written.
- */
- int error= mysql_bin_log.write(&qinfo);
+ */
+ if(binlog_should_compress(query_len))
+ {
+ Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct,
+ suppress_use, errcode);
+ error= mysql_bin_log.write(&qinfo);
+ }
+ else
+ {
+ Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
+ suppress_use, errcode);
+ error= mysql_bin_log.write(&qinfo);
+ }
+
binlog_table_maps= 0;
DBUG_RETURN(error);
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 994a161a646..a63a3379ec1 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -5681,6 +5681,12 @@ void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage,
#define THD_EXIT_COND(P1, P2) \
thd_exit_cond(P1, P2, __func__, __FILE__, __LINE__)
+inline bool binlog_should_compress(ulong len)
+{
+ return opt_bin_log_compress &&
+ len >= opt_bin_log_compress_min_len;
+}
+
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index bdf90f7caf6..c115ac5f0ec 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1629,7 +1629,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset,
break;
case GTID_UNTIL_STOP_AFTER_TRANSACTION:
if (event_type != XID_EVENT &&
- (event_type != QUERY_EVENT ||
+ (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
!Query_log_event::peek_is_commit_rollback
(info->packet->ptr()+*ev_offset,
info->packet->length()-*ev_offset,
@@ -1863,7 +1863,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
return NULL;
case GTID_SKIP_TRANSACTION:
if (event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
+ (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
len - ev_offset,
current_checksum_alg)))
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 3e6a06c3ab0..ab4429005ab 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1157,6 +1157,18 @@ static Sys_var_mybool Sys_log_bin(
"log_bin", "Whether the binary log is enabled",
READ_ONLY GLOBAL_VAR(opt_bin_log), NO_CMD_LINE, DEFAULT(FALSE));
+static Sys_var_mybool Sys_log_bin_compress(
+ "log_bin_compress", "Whether the binary log can be compressed",
+ GLOBAL_VAR(opt_bin_log_compress), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+/* the min length is 10, means that Begin/Commit/Rollback would never be compressed! */
+static Sys_var_uint Sys_log_bin_compress_min_len(
+ "log_bin_compress_min_len",
+ "Minimum length of sql statement(in statement mode) or record(in row mode)"
+ "that can be compressed.",
+ GLOBAL_VAR(opt_bin_log_compress_min_len),
+ CMD_LINE(OPT_ARG), VALID_RANGE(10, 1024), DEFAULT(256), BLOCK_SIZE(1));
+
static Sys_var_mybool Sys_trust_function_creators(
"log_bin_trust_function_creators",
"If set to FALSE (the default), then when --log-bin is used, creation "