summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvinchen <vinchen13@gmail.com>2016-10-08 12:07:26 +0800
committerKristian Nielsen <knielsen@knielsen-hq.org>2016-10-19 20:20:35 +0200
commit640051e06aa585b056671738a6614cd314074ac6 (patch)
tree540b46e17247011c429d184413ff4f957af64e42
parent27025221fe2ea17aa737ad2ad31011407c00dcc9 (diff)
downloadmariadb-git-640051e06aa585b056671738a6614cd314074ac6.tar.gz
Binlog compressed
Add some event types for the compressed event, there are: QUERY_COMPRESSED_EVENT, WRITE_ROWS_COMPRESSED_EVENT_V1, UPDATE_ROWS_COMPRESSED_EVENT_V1, DELETE_POWS_COMPRESSED_EVENT_V1, WRITE_ROWS_COMPRESSED_EVENT, UPDATE_ROWS_COMPRESSED_EVENT, DELETE_POWS_COMPRESSED_EVENT. These events inheritance the uncompressed editor events. One of their constructor functions and write function have been overridden for uncompressing and compressing. Anything but this is totally the same. On slave, The IO thread will uncompress and convert them When it receiving the events from the master. So the SQL and worker threads can be stay unchanged. Now we use zlib as compress algorithm. It maybe support other algorithm in the future.
-rw-r--r--client/mysqlbinlog.cc7
-rw-r--r--sql/log.cc2
-rw-r--r--sql/log_event.cc550
-rw-r--r--sql/log_event.h123
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/mysqld.h3
-rw-r--r--sql/rpl_rli.cc6
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc67
-rw-r--r--sql/sql_class.cc52
-rw-r--r--sql/sql_class.h6
-rw-r--r--sql/sql_repl.cc4
-rw-r--r--sql/sys_vars.cc12
13 files changed, 797 insertions, 39 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc
index 6a52c9fe29a..94443791441 100644
--- a/client/mysqlbinlog.cc
+++ b/client/mysqlbinlog.cc
@@ -1002,6 +1002,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
switch (ev_type) {
case QUERY_EVENT:
+ case QUERY_COMPRESSED_EVENT:
{
Query_log_event *qe= (Query_log_event*)ev;
if (!qe->is_trans_keyword())
@@ -1243,6 +1244,12 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
case WRITE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V1:
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
{
Rows_log_event *e= (Rows_log_event*) ev;
if (print_row_event(print_event_info, ev, e->get_table_id(),
diff --git a/sql/log.cc b/sql/log.cc
index 2c290715741..4d903154d98 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -9838,7 +9838,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
((last_gtid_standalone && !ev->is_part_of_group(typ)) ||
(!last_gtid_standalone &&
(typ == XID_EVENT ||
- (typ == QUERY_EVENT &&
+ (LOG_EVENT_IS_QUERY(typ) &&
(((Query_log_event *)ev)->is_commit() ||
((Query_log_event *)ev)->is_rollback()))))))
{
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 01bb5e7a561..fdf8007283d 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -51,6 +51,7 @@
#include "rpl_utility.h"
#include "rpl_constants.h"
#include "sql_digest.h"
+#include "zlib.h"
#define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
@@ -702,6 +703,271 @@ char *str_to_hex(char *to, const char *from, uint len)
return to; // pointer to end 0 of 'to'
}
+#define BINLOG_COMPRESSED_HEADER_LEN 1
+#define BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES 4
+/**
+ Compressed Record
+ Record Header: 1 Byte
+ 0 Bit: Always 1, mean compressed;
+ 1-3 Bit: Reversed, compressed algorithmAlways 0, means zlib
+ 4-7 Bit: Bytes of "Record Original Length"
+ Record Original Length: 1-4 Bytes
+ Compressed Buf:
+*/
+
+/**
+ Get the length of compress content.
+*/
+
+uint32 binlog_get_compress_len(uint32 len)
+{
+ /* 5 for the begin content, 1 reserved for a '\0'*/
+ return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES)
+ + compressBound(len) + 1);
+}
+
+/**
+ Compress buf from 'src' to 'dst'.
+
+ Note: 1) Then the caller should guarantee the length of 'dst', which
+ can be got by binlog_get_uncompress_len, is enough to hold
+ the content uncompressed.
+ 2) The 'comlen' should stored the length of 'dst', and it will
+ be set as the size of compressed content after return.
+
+ return zero if successful, others otherwise.
+*/
+int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
+{
+ uchar lenlen;
+ if (len & 0xFF000000)
+ {
+ dst[1] = uchar(len >> 24);
+ dst[2] = uchar(len >> 16);
+ dst[3] = uchar(len >> 8);
+ dst[4] = uchar(len);
+ lenlen = 4;
+ }
+ else if (len & 0x00FF0000)
+ {
+ dst[1] = uchar(len >> 16);
+ dst[2] = uchar(len >> 8);
+ dst[3] = uchar(len);
+ lenlen = 3;
+ }
+ else if (len & 0x0000FF00)
+ {
+ dst[1] = uchar(len >> 8);
+ dst[2] = uchar(len);
+ lenlen = 2;
+ }
+ else
+ {
+ dst[1] = uchar(len);
+ lenlen = 1;
+ }
+ dst[0] = 0x80 | (lenlen & 0x07);
+
+ uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1;
+ if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen, (const Bytef *)src, (uLongf)len) != Z_OK)
+ {
+ return 1;
+ }
+ *comlen = (uint32)tmplen + BINLOG_COMPRESSED_HEADER_LEN + lenlen;
+ return 0;
+}
+
+/**
+ Convert a query_compressed_log_event to query_log_event
+ from 'src' to 'dst'(malloced inside), the size after compress
+ stored in 'newlen'.
+
+ @Warning:
+ 1)The caller should call my_free to release 'dst'.
+
+ return zero if successful, others otherwise.
+*/
+
+int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char **dst, ulong *newlen)
+{
+ ulong len = uint4korr(src + EVENT_LEN_OFFSET);
+ const char *tmp = src;
+
+ DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT);
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[QUERY_COMPRESSED_EVENT-1];
+
+ tmp += common_header_len;
+
+ uint db_len = (uint)tmp[Q_DB_LEN_OFFSET];
+ uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET);
+
+ tmp += post_header_len + status_vars_len + db_len + 1;
+
+ uint32 un_len = binlog_get_uncompress_len(tmp);
+ *newlen = (tmp - src) + un_len;
+ if(contain_checksum)
+ *newlen += BINLOG_CHECKSUM_LEN;
+
+ *dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE));
+ if (!*dst)
+ {
+ return 1;
+ }
+
+ /* copy the head*/
+ memcpy(*dst, src , tmp - src);
+ if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len))
+ {
+ my_free(*dst);
+ return 1;
+ }
+
+ (*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT;
+ int4store(*dst + EVENT_LEN_OFFSET, *newlen);
+ if(contain_checksum){
+ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
+ int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len));
+ }
+ return 0;
+}
+
+int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char **dst, ulong *newlen)
+{
+ Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
+ ulong len = uint4korr(src + EVENT_LEN_OFFSET);
+ const char *tmp = src;
+ char *buf = NULL;
+
+ DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[type-1];
+
+ tmp += common_header_len + ROWS_HEADER_LEN_V1;
+ if (post_header_len == ROWS_HEADER_LEN_V2)
+ {
+ /*
+ Have variable length header, check length,
+ which includes length bytes
+ */
+ uint16 var_header_len= uint2korr(tmp);
+ assert(var_header_len >= 2);
+
+ /* skip over var-len header, extracting 'chunks' */
+ tmp += var_header_len;
+
+ /* get the uncompressed event type */
+ type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT);
+ }
+ else
+ {
+ /* get the uncompressed event type */
+ type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1);
+ }
+
+
+ ulong m_width = net_field_length((uchar **)&tmp);
+ tmp += (m_width + 7) / 8;
+
+ if (type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_EVENT)
+ {
+ tmp += (m_width + 7) / 8;
+ }
+
+ uint32 un_len = binlog_get_uncompress_len(tmp);
+ *newlen = (tmp - src) + un_len;
+ if(contain_checksum)
+ *newlen += BINLOG_CHECKSUM_LEN;
+
+ uint32 alloc_size = ALIGN_SIZE(*newlen);
+ buf = (char *)my_malloc(alloc_size , MYF(MY_FAE));
+ if (!buf)
+ {
+ return 1;
+ }
+
+ /* copy the head*/
+ memcpy(buf, src , tmp - src);
+ /* uncompress the body */
+ if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len))
+ {
+ my_free(buf);
+ return 1;
+ }
+
+ buf[EVENT_TYPE_OFFSET] = type;
+ int4store(buf + EVENT_LEN_OFFSET, *newlen);
+ if(contain_checksum){
+ ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
+ int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len));
+ }
+ *dst = buf;
+ return 0;
+}
+
+/**
+ Get the length of uncompress content.
+*/
+
+uint32 binlog_get_uncompress_len(const char *buf)
+{
+ DBUG_ASSERT((buf[0] & 0xe0) == 0x80);
+ uint32 lenlen = buf[0] & 0x07;
+ uint32 len = 0;
+ switch(lenlen)
+ {
+ case 1:
+ len = uchar(buf[1]);
+ break;
+ case 2:
+ len = uchar(buf[1]) << 8 | uchar(buf[2]);
+ break;
+ case 3:
+ len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]);
+ break;
+ case 4:
+ len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 | uchar(buf[3]) << 8 | uchar(buf[4]);
+ break;
+ default:
+ DBUG_ASSERT(lenlen >= 1 && lenlen <= 4);
+ break;
+ }
+ return len;
+}
+
+/**
+ Uncompress the content in 'src' with length of 'len' to 'dst'.
+
+ Note: 1) Then the caller should guarantee the length of 'dst' (which
+ can be got by statement_get_uncompress_len) is enough to hold
+ the content uncompressed.
+ 2) The 'newlen' should stored the length of 'dst', and it will
+ be set as the size of uncompressed content after return.
+
+ return zero if successful, others otherwise.
+*/
+int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen)
+{
+ if((src[0] & 0x80) == 0)
+ {
+ return 1;
+ }
+
+ uint32 lenlen = src[0] & 0x07;
+ uLongf buflen = *newlen;
+ if(uncompress((Bytef *)dst, &buflen, (const Bytef*)src + 1 + lenlen, len) != Z_OK)
+ {
+ return 1;
+ }
+
+ *newlen = (uint32)buflen;
+ return 0;
+}
+
#ifndef MYSQL_CLIENT
/**
@@ -828,6 +1094,13 @@ const char* Log_event::get_type_str(Log_event_type type)
case TRANSACTION_CONTEXT_EVENT: return "Transaction_context";
case VIEW_CHANGE_EVENT: return "View_change";
case XA_PREPARE_LOG_EVENT: return "XA_prepare";
+ case QUERY_COMPRESSED_EVENT: return "Query_compressed";
+ case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed";
+ case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed";
+ case DELETE_ROWS_COMPRESSED_EVENT: return "Delete_rows_compressed";
+ case WRITE_ROWS_COMPRESSED_EVENT_V1: return "Write_rows_compressed_v1";
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1: return "Update_rows_compressed_v1";
+ case DELETE_ROWS_COMPRESSED_EVENT_V1: return "Delete_rows_compressed_v1";
default: return "Unknown"; /* impossible */
}
@@ -1661,6 +1934,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len, fdle, QUERY_EVENT);
break;
+ case QUERY_COMPRESSED_EVENT:
+ ev = new Query_compressed_log_event(buf, event_len, fdle, QUERY_COMPRESSED_EVENT);
+ break;
case LOAD_EVENT:
ev = new Load_log_event(buf, event_len, fdle);
break;
@@ -1735,6 +2011,19 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
ev = new Delete_rows_log_event(buf, event_len, fdle);
break;
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Write_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Update_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ ev = new Delete_rows_compressed_log_event(buf, event_len, fdle);
+ break;
+
/* MySQL GTID events are ignored */
case GTID_LOG_EVENT:
case ANONYMOUS_GTID_LOG_EVENT:
@@ -1778,7 +2067,7 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
else
{
DBUG_PRINT("error",("Unknown event code: %d",
- (int) buf[EVENT_TYPE_OFFSET]));
+ (uchar) buf[EVENT_TYPE_OFFSET]));
ev= NULL;
break;
}
@@ -2831,11 +3120,32 @@ void Log_event::print_base64(IO_CACHE* file,
}
case UPDATE_ROWS_EVENT:
case UPDATE_ROWS_EVENT_V1:
- {
- ev= new Update_rows_log_event((const char*) ptr, size,
- glob_description_event);
- break;
- }
+ {
+ ev= new Update_rows_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ {
+ ev= new Write_rows_compressed_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ {
+ ev= new Update_rows_compressed_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ {
+ ev= new Delete_rows_compressed_log_event((const char*) ptr, size,
+ glob_description_event);
+ break;
+ }
default:
break;
}
@@ -3190,6 +3500,23 @@ bool Query_log_event::write()
write_footer();
}
+bool Query_compressed_log_event::write()
+{
+ const char *query_tmp = query;
+ uint32 q_len_tmp = q_len;
+ bool ret = true;
+ q_len = binlog_get_compress_len(q_len);
+ query = (char *)my_malloc(q_len, MYF(MY_FAE));
+ if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len))
+ {
+ ret = Query_log_event::write();
+ }
+ my_free((void *)query);
+ query = query_tmp;
+ q_len = q_len_tmp;
+ return ret;
+}
+
/**
The simplest constructor that could possibly work. This is used for
creating static objects that have a special meaning and are invisible
@@ -3382,6 +3709,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu cache_tye: %d",
(ulong) flags2, sql_mode, cache_type));
}
+
+Query_compressed_log_event::Query_compressed_log_event(THD* thd_arg, const char* query_arg,
+ ulong query_length, bool using_trans,
+ bool direct, bool suppress_use, int errcode)
+ :Query_log_event(thd_arg, query_arg, query_length, using_trans, direct, suppress_use, errcode),
+ query_buf(0)
+{
+
+}
#endif /* MYSQL_CLIENT */
@@ -3788,6 +4124,28 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
DBUG_VOID_RETURN;
}
+Query_compressed_log_event::Query_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event,
+ Log_event_type event_type)
+ :Query_log_event(buf, event_len, description_event, event_type),query_buf(NULL)
+{
+ if(query)
+ {
+ uint32 un_len=binlog_get_uncompress_len(query);
+ query_buf = (Log_event::Byte*)my_malloc(ALIGN_SIZE(un_len + 1), MYF(MY_WME)); //reserve one byte for '\0'
+ if(query_buf && !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len))
+ {
+ query_buf[un_len] = 0;
+ query = (const char *)query_buf;
+ q_len = un_len;
+ }
+ else
+ {
+ query= 0;
+ }
+ }
+}
/*
Replace a binlog event read into a packet with a dummy event. Either a
@@ -5056,6 +5414,15 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
post_header_len[START_ENCRYPTION_EVENT-1]= START_ENCRYPTION_HEADER_LEN;
+ //compressed event
+ post_header_len[QUERY_COMPRESSED_EVENT-1]= QUERY_HEADER_LEN;
+ post_header_len[WRITE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[UPDATE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[DELETE_ROWS_COMPRESSED_EVENT-1]= ROWS_HEADER_LEN_V2;
+ post_header_len[WRITE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[UPDATE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+ post_header_len[DELETE_ROWS_COMPRESSED_EVENT_V1-1]= ROWS_HEADER_LEN_V1;
+
// Sanity-check that all post header lengths are initialized.
int i;
for (i=0; i<number_of_event_types; i++)
@@ -9404,7 +9771,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
{
DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
uint8 const common_header_len= description_event->common_header_len;
- Log_event_type event_type= (Log_event_type) buf[EVENT_TYPE_OFFSET];
+ Log_event_type event_type= (Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET];
m_type= event_type;
uint8 const post_header_len= description_event->post_header_len[event_type-1];
@@ -9503,8 +9870,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
- if ((event_type == UPDATE_ROWS_EVENT) ||
- (event_type == UPDATE_ROWS_EVENT_V1))
+ if (LOG_EVENT_IS_UPDATE_ROW(event_type))
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
@@ -9551,6 +9917,31 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
DBUG_VOID_RETURN;
}
+void Rows_log_event::uncompress_buf()
+{
+ uint32 un_len = binlog_get_uncompress_len((char *)m_rows_buf);
+ uchar *new_buf= (uchar*) my_malloc(ALIGN_SIZE(un_len), MYF(MY_WME));
+ if (new_buf)
+ {
+ if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf, m_rows_cur - m_rows_buf, &un_len))
+ {
+ my_free(m_rows_buf);
+ m_rows_buf = new_buf;
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ m_curr_row= m_rows_buf;
+#endif
+ m_rows_end= m_rows_buf + un_len;
+ m_rows_cur= m_rows_end;
+ return;
+ }
+ else
+ {
+ my_free(new_buf);
+ }
+ }
+ m_cols.bitmap= 0; // catch it in is_valid
+}
+
Rows_log_event::~Rows_log_event()
{
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
@@ -9573,7 +9964,8 @@ int Rows_log_event::get_data_size()
(m_rows_cur - m_rows_buf););
int data_size= 0;
- bool is_v2_event= get_type_code() > DELETE_ROWS_EVENT_V1;
+ Log_event_type type = get_type_code();
+ bool is_v2_event= LOG_EVENT_IS_ROW_V2(type);
if (is_v2_event)
{
data_size= ROWS_HEADER_LEN_V2 +
@@ -10388,6 +10780,24 @@ bool Rows_log_event::write_data_body()
return res;
}
+
+bool Rows_log_event::write_compressed()
+{
+ uchar *m_rows_buf_tmp = m_rows_buf;
+ uchar *m_rows_cur_tmp = m_rows_cur;
+ bool ret = true;
+ uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
+ m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE));
+ if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
+ {
+ m_rows_cur = comlen + m_rows_buf;
+ ret = Log_event::write();
+ }
+ my_free(m_rows_buf);
+ m_rows_buf = m_rows_buf_tmp;
+ m_rows_cur = m_rows_cur_tmp;
+ return ret;
+}
#endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
@@ -11302,6 +11712,20 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
is_transactional, WRITE_ROWS_EVENT_V1)
{
}
+
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid_arg,
+ bool is_transactional)
+ : Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
+{
+ //m_type = log_bin_use_v1_row_events ? WRITE_ROWS_COMPRESSED_EVENT_V1 : WRITE_ROWS_COMPRESSED_EVENT;
+ m_type = WRITE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+bool Write_rows_compressed_log_event::write()
+{
+ return Rows_log_event::write_compressed();
+}
#endif
/*
@@ -11314,6 +11738,14 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, description_event)
{
}
+
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+: Write_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -11805,6 +12237,23 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
{DBUG_SET("+d,simulate_my_b_fill_error");});
Rows_log_event::print_helper(file, print_event_info, "Write_rows");
}
+
+void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+{
+ char *new_buf;
+ ulong len;
+ if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
+ temp_buf, &new_buf, &len))
+ {
+ free_temp_buf();
+ register_temp_buf(new_buf, true);
+ Rows_log_event::print_helper(file, print_event_info, "Write_compressed_rows");
+ }
+ else
+ {
+ my_b_printf(&print_event_info->head_cache, "ERROR: uncompress write_compressed_rows failed\n");
+ }
+}
#endif
@@ -12325,6 +12774,20 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
DELETE_ROWS_EVENT_V1)
{
}
+
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid_arg,
+ bool is_transactional)
+ : Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
+{
+ //m_type = log_bin_use_v1_row_events ? DELETE_ROWS_COMPRESSED_EVENT_V1 : DELETE_ROWS_COMPRESSED_EVENT;
+ m_type = DELETE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+bool Delete_rows_compressed_log_event::write()
+{
+ return Rows_log_event::write_compressed();
+}
#endif /* #if !defined(MYSQL_CLIENT) */
/*
@@ -12337,6 +12800,14 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, description_event)
{
}
+
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Delete_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -12434,6 +12905,24 @@ void Delete_rows_log_event::print(FILE *file,
{
Rows_log_event::print_helper(file, print_event_info, "Delete_rows");
}
+
+void Delete_rows_compressed_log_event::print(FILE *file,
+ PRINT_EVENT_INFO* print_event_info)
+{
+ char *new_buf;
+ ulong len;
+ if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
+ temp_buf, &new_buf, &len))
+ {
+ free_temp_buf();
+ register_temp_buf(new_buf, true);
+ Rows_log_event::print_helper(file, print_event_info, "Delete_compressed_rows");
+ }
+ else
+ {
+ my_b_printf(&print_event_info->head_cache, "ERROR: uncompress delete_compressed_rows failed\n");
+ }
+}
#endif
@@ -12461,6 +12950,20 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
init(tbl_arg->rpl_write_set);
}
+Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid,
+ bool is_transactional)
+: Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional)
+{
+ //m_type = log_bin_use_v1_row_events ? UPDATE_ROWS_COMPRESSED_EVENT_V1 : UPDATE_ROWS_COMPRESSED_EVENT;
+ m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+bool Update_rows_compressed_log_event::write()
+{
+ return Rows_log_event::write_compressed();
+}
+
void Update_rows_log_event::init(MY_BITMAP const *cols)
{
/* if my_bitmap_init fails, caught in is_valid() */
@@ -12499,6 +13002,14 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
: Rows_log_event(buf, event_len, description_event)
{
}
+
+Update_rows_compressed_log_event::Update_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+ : Update_rows_log_event(buf, event_len, description_event)
+{
+ uncompress_buf();
+}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -12651,6 +13162,23 @@ void Update_rows_log_event::print(FILE *file,
{
Rows_log_event::print_helper(file, print_event_info, "Update_rows");
}
+
+void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ char *new_buf;
+ ulong len;
+ if(!Row_log_event_uncompress(glob_description_event, checksum_alg,
+ temp_buf, &new_buf, &len))
+ {
+ free_temp_buf();
+ register_temp_buf(new_buf, true);
+ Rows_log_event::print_helper(file, print_event_info, "Update_compressed_rows");
+ }
+ else
+ {
+ my_b_printf(&print_event_info->head_cache, "ERROR: uncompress update_compressed_rows failed\n");
+ }
+}
#endif
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
@@ -12778,7 +13306,7 @@ err:
DBUG_ASSERT(error != 0);
sql_print_error("Error in Log_event::read_log_event(): "
"'%s', data_len: %d, event_type: %d",
- error,data_len,head[EVENT_TYPE_OFFSET]);
+ error,data_len,(uchar)head[EVENT_TYPE_OFFSET]);
}
(*arg_buf)+= data_len;
(*arg_buf_len)-= data_len;
diff --git a/sql/log_event.h b/sql/log_event.h
index 306f78ca4c9..14bfc54aed3 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -692,11 +692,33 @@ enum Log_event_type
START_ENCRYPTION_EVENT= 164,
+ /*
+ Compressed binlog event.
+ */
+ QUERY_COMPRESSED_EVENT = 165,
+ WRITE_ROWS_COMPRESSED_EVENT_V1 = 166,
+ UPDATE_ROWS_COMPRESSED_EVENT_V1 = 167,
+ DELETE_ROWS_COMPRESSED_EVENT_V1 = 168,
+ WRITE_ROWS_COMPRESSED_EVENT = 169,
+ UPDATE_ROWS_COMPRESSED_EVENT = 170,
+ DELETE_ROWS_COMPRESSED_EVENT = 171,
+
/* Add new MariaDB events here - right above this comment! */
ENUM_END_EVENT /* end marker */
};
+#define LOG_EVENT_IS_QUERY(type) (type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT)
+#define LOG_EVENT_IS_WRITE_ROW(type) (type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 || type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_UPDATE_ROW(type) (type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_DELETE_ROW(type) (type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 || type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_ROW_COMPRESSED(type) (type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||\
+ type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||\
+ type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1)
+#define LOG_EVENT_IS_ROW_V2(type) (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT || \
+ type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT )
+
+
/*
The number of types we handle in Format_description_log_event (UNKNOWN_EVENT
is not to be handled, it does not exist in binlogs, it does not have a
@@ -2045,9 +2067,37 @@ public: /* !!! Public in this patch to allow old usage */
!strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8);
}
- bool is_begin() { return !strcmp(query, "BEGIN"); }
- bool is_commit() { return !strcmp(query, "COMMIT"); }
- bool is_rollback() { return !strcmp(query, "ROLLBACK"); }
+ virtual bool is_begin() { return !strcmp(query, "BEGIN"); }
+ virtual bool is_commit() { return !strcmp(query, "COMMIT"); }
+ virtual bool is_rollback() { return !strcmp(query, "ROLLBACK"); }
+};
+
+class Query_compressed_log_event:public Query_log_event{
+protected:
+ Log_event::Byte* query_buf; // point to the uncompressed query
+public:
+ Query_compressed_log_event(const char* buf, uint event_len,
+ const Format_description_log_event *description_event,
+ Log_event_type event_type);
+ ~Query_compressed_log_event()
+ {
+ if (query_buf)
+ my_free(query_buf);
+ }
+ Log_event_type get_type_code() { return QUERY_COMPRESSED_EVENT; }
+
+ /*
+ the min length of log_bin_compress_min_len is 10,
+ means that Begin/Commit/Rollback would never be compressed!
+ */
+ virtual bool is_begin() { return false; }
+ virtual bool is_commit() { return false; }
+ virtual bool is_rollback() { return false; }
+#ifdef MYSQL_SERVER
+ Query_compressed_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
+ bool using_trans, bool direct, bool suppress_use, int error);
+ virtual bool write();
+#endif
};
@@ -4341,6 +4391,7 @@ public:
#ifdef MYSQL_SERVER
virtual bool write_data_header();
virtual bool write_data_body();
+ virtual bool write_compressed();
virtual const char *get_db() { return m_table->s->db.str; }
#endif
/*
@@ -4375,6 +4426,7 @@ protected:
#endif
Rows_log_event(const char *row_data, uint event_len,
const Format_description_log_event *description_event);
+ void uncompress_buf();
#ifdef MYSQL_CLIENT
void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name);
@@ -4587,6 +4639,23 @@ private:
#endif
};
+class Write_rows_compressed_log_event : public Write_rows_log_event
+{
+public:
+#if defined(MYSQL_SERVER)
+ Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id,
+ bool is_transactional);
+ virtual bool write();
+#endif
+#ifdef HAVE_REPLICATION
+ Write_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+#endif
+private:
+#if defined(MYSQL_CLIENT)
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+};
/**
@class Update_rows_log_event
@@ -4657,6 +4726,24 @@ protected:
#endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
};
+class Update_rows_compressed_log_event : public Update_rows_log_event
+{
+public:
+#if defined(MYSQL_SERVER)
+ Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id,
+ bool is_transactional);
+ virtual bool write();
+#endif
+#ifdef HAVE_REPLICATION
+ Update_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+#endif
+private:
+#if defined(MYSQL_CLIENT)
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+};
+
/**
@class Delete_rows_log_event
@@ -4723,6 +4810,23 @@ protected:
#endif
};
+class Delete_rows_compressed_log_event : public Delete_rows_log_event
+{
+public:
+#if defined(MYSQL_SERVER)
+ Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional);
+ virtual bool write();
+#endif
+#ifdef HAVE_REPLICATION
+ Delete_rows_compressed_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+#endif
+private:
+#if defined(MYSQL_CLIENT)
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+};
+
#include "log_event_old.h"
@@ -4964,4 +5068,17 @@ extern TYPELIB binlog_checksum_typelib;
@} (end of group Replication)
*/
+
+int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen);
+int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen);
+uint32 binlog_get_compress_len(uint32 len);
+uint32 binlog_get_uncompress_len(const char *buf);
+
+int query_event_uncompress(const Format_description_log_event *description_event,
+ bool contain_checksum, const char *src, char **dst, ulong *newlen);
+
+int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
+ const char *src, char **dst, ulong *newlen);
+
+
#endif /* _log_event_h */
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 310ccb047c4..8881e0423f5 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -389,6 +389,8 @@ static DYNAMIC_ARRAY all_options;
/* Global variables */
bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0;
+bool opt_bin_log_compress;
+uint opt_bin_log_compress_min_len;
my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0;
my_bool debug_assert_on_not_freed_memory= 0;
my_bool disable_log_notes;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 294b463161b..02bbdf839c1 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -109,7 +109,8 @@ extern CHARSET_INFO *character_set_filesystem;
extern MY_BITMAP temp_pool;
extern bool opt_large_files;
-extern bool opt_update_log, opt_bin_log, opt_error_log;
+extern bool opt_update_log, opt_bin_log, opt_error_log, opt_bin_log_compress;
+extern uint opt_bin_log_compress_min_len;
extern my_bool opt_log, opt_bootstrap;
extern my_bool opt_backup_history_log;
extern my_bool opt_backup_progress_log;
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 9e2977c8bc5..f687f645171 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1732,6 +1732,12 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
case DELETE_ROWS_EVENT:
case UPDATE_ROWS_EVENT:
case WRITE_ROWS_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
/*
After the last Rows event has been applied, the saved Annotate_rows
event (if any) is not needed anymore and can be deleted.
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index d42611bfebe..4d1925e5d84 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -7232,3 +7232,5 @@ ER_PARTITION_DEFAULT_ERROR
ukr "Припустимо мати тільки один DEFAULT розділ"
ER_REFERENCED_TRG_DOES_NOT_EXIST
eng "Referenced trigger '%s' for the given action time and event type does not exist"
+ER_BINLOG_UNCOMPRESS_ERROR
+ eng "Uncompress the compressed binlog failed"
diff --git a/sql/slave.cc b/sql/slave.cc
index 20bf68e6b6f..f7efb97e44d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3769,7 +3769,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
}
/* Check for an event that starts or stops a transaction */
- if (typ == QUERY_EVENT)
+ if (LOG_EVENT_IS_QUERY(typ))
{
Query_log_event *qev= (Query_log_event*) ev;
/*
@@ -3909,7 +3909,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
DBUG_EXECUTE_IF("incomplete_group_in_relay_log",
if ((typ == XID_EVENT) ||
- ((typ == QUERY_EVENT) &&
+ (LOG_EVENT_IS_QUERY(typ) &&
strcmp("COMMIT", ((Query_log_event *) ev)->query) == 0))
{
DBUG_ASSERT(thd->transaction.all.modified_non_trans_table);
@@ -5664,6 +5664,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
+ bool compressed_event = FALSE;
/*
FD_q must have been prepared for the first R_a event
@@ -5710,7 +5711,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
// Emulate the network corruption
DBUG_EXECUTE_IF("corrupt_queue_event",
- if (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
{
char *debug_event_buf_c = (char*) buf;
int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
@@ -6144,6 +6145,43 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
inc_pos= event_len;
}
break;
+ /*
+ Binlog compressed event should uncompress in IO thread
+ */
+ case QUERY_COMPRESSED_EVENT:
+ inc_pos= event_len;
+ if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ compressed_event = true;
+ goto default_action;
+
+ case WRITE_ROWS_COMPRESSED_EVENT:
+ case UPDATE_ROWS_COMPRESSED_EVENT:
+ case DELETE_ROWS_COMPRESSED_EVENT:
+ case WRITE_ROWS_COMPRESSED_EVENT_V1:
+ case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+ case DELETE_ROWS_COMPRESSED_EVENT_V1:
+ inc_pos = event_len;
+ {
+ if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len))
+ {
+ char llbuf[22];
+ error = ER_BINLOG_UNCOMPRESS_ERROR;
+ error_msg.append(STRING_WITH_LEN("binlog uncompress error, master log_pos: "));
+ llstr(mi->master_log_pos, llbuf);
+ error_msg.append(llbuf, strlen(llbuf));
+ goto err;
+ }
+ }
+ compressed_event = true;
+ goto default_action;
#ifndef DBUG_OFF
case XID_EVENT:
@@ -6162,7 +6200,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
{
if (mi->dbug_do_disconnect &&
- (((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT) ||
+ (LOG_EVENT_IS_QUERY((uchar)buf[EVENT_TYPE_OFFSET]) ||
((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
&& (--mi->dbug_event_counter == 0))
{
@@ -6175,7 +6213,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
DBUG_EXECUTE_IF("kill_slave_io_before_commit",
{
if ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg)))
{
@@ -6195,7 +6233,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid;
}
- inc_pos= event_len;
+ if (!compressed_event)
+ inc_pos= event_len;
+
break;
}
@@ -6286,8 +6326,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
/* everything is filtered out from non-master */
(s_id != mi->master_id ||
/* for the master meta information is necessary */
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT))) ||
/*
Check whether it needs to be filtered based on domain_id
@@ -6316,9 +6356,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/
if (!(s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
- (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
+ ((uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
@@ -6359,7 +6399,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
buf[EVENT_TYPE_OFFSET])) ||
(!mi->last_queued_gtid_standalone &&
((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT ||
- ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT &&
+ ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(buf, event_len,
checksum_alg))))))
{
@@ -6389,6 +6429,9 @@ err:
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr());
+ if(compressed_event)
+ my_free((void *)buf);
+
DBUG_RETURN(error);
}
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 1af3b9a9cca..a741a0f6d01 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -6373,7 +6373,14 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
- Rows_log_event* const ev=
+ Rows_log_event* ev;
+ if (binlog_should_compress(len))
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
+ len, is_trans,
+ static_cast<Write_rows_compressed_log_event*>(0));
+ else
+ ev =
binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Write_rows_log_event*>(0));
@@ -6421,8 +6428,15 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
DBUG_DUMP("after_row", after_row, after_size);
#endif
- Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, variables.server_id,
+ Rows_log_event* ev;
+ if(binlog_should_compress(before_size + after_size))
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
+ before_size + after_size, is_trans,
+ static_cast<Update_rows_compressed_log_event*>(0));
+ else
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0));
@@ -6474,8 +6488,15 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
if (variables.option_bits & OPTION_GTID_BEGIN)
is_trans= 1;
- Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, variables.server_id,
+ Rows_log_event* ev;
+ if(binlog_should_compress(len))
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
+ len, is_trans,
+ static_cast<Delete_rows_compressed_log_event*>(0));
+ else
+ ev =
+ binlog_prepare_pending_rows_event(table, variables.server_id,
len, is_trans,
static_cast<Delete_rows_log_event*>(0));
@@ -6940,15 +6961,28 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
flush the pending rows event if necessary.
*/
{
- Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
- suppress_use, errcode);
+ Log_event* ev = NULL;
+ int error = 0;
+
/*
Binlog table maps will be irrelevant after a Query_log_event
(they are just removed on the slave side) so after the query
log event is written to the binary log, we pretend that no
table maps were written.
- */
- int error= mysql_bin_log.write(&qinfo);
+ */
+ if(binlog_should_compress(query_len))
+ {
+ Query_compressed_log_event qinfo(this, query_arg, query_len, is_trans, direct,
+ suppress_use, errcode);
+ error= mysql_bin_log.write(&qinfo);
+ }
+ else
+ {
+ Query_log_event qinfo(this, query_arg, query_len, is_trans, direct,
+ suppress_use, errcode);
+ error= mysql_bin_log.write(&qinfo);
+ }
+
binlog_table_maps= 0;
DBUG_RETURN(error);
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 994a161a646..a63a3379ec1 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -5681,6 +5681,12 @@ void thd_exit_cond(MYSQL_THD thd, const PSI_stage_info *stage,
#define THD_EXIT_COND(P1, P2) \
thd_exit_cond(P1, P2, __func__, __FILE__, __LINE__)
+inline bool binlog_should_compress(ulong len)
+{
+ return opt_bin_log_compress &&
+ len >= opt_bin_log_compress_min_len;
+}
+
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index bdf90f7caf6..c115ac5f0ec 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1629,7 +1629,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset,
break;
case GTID_UNTIL_STOP_AFTER_TRANSACTION:
if (event_type != XID_EVENT &&
- (event_type != QUERY_EVENT ||
+ (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
!Query_log_event::peek_is_commit_rollback
(info->packet->ptr()+*ev_offset,
info->packet->length()-*ev_offset,
@@ -1863,7 +1863,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
return NULL;
case GTID_SKIP_TRANSACTION:
if (event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
+ (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */
Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
len - ev_offset,
current_checksum_alg)))
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 3e6a06c3ab0..ab4429005ab 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1157,6 +1157,18 @@ static Sys_var_mybool Sys_log_bin(
"log_bin", "Whether the binary log is enabled",
READ_ONLY GLOBAL_VAR(opt_bin_log), NO_CMD_LINE, DEFAULT(FALSE));
+static Sys_var_mybool Sys_log_bin_compress(
+ "log_bin_compress", "Whether the binary log can be compressed",
+ GLOBAL_VAR(opt_bin_log_compress), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+/* the min length is 10, means that Begin/Commit/Rollback would never be compressed! */
+static Sys_var_uint Sys_log_bin_compress_min_len(
+ "log_bin_compress_min_len",
+ "Minimum length of sql statement(in statement mode) or record(in row mode)"
+ "that can be compressed.",
+ GLOBAL_VAR(opt_bin_log_compress_min_len),
+ CMD_LINE(OPT_ARG), VALID_RANGE(10, 1024), DEFAULT(256), BLOCK_SIZE(1));
+
static Sys_var_mybool Sys_trust_function_creators(
"log_bin_trust_function_creators",
"If set to FALSE (the default), then when --log-bin is used, creation "