summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log.cc44
-rw-r--r--sql/log.h2
-rw-r--r--sql/log_event.h16
-rw-r--r--sql/log_event_server.cc4
-rw-r--r--sql/rpl_circular_buffer.cpp139
-rw-r--r--sql/rpl_circular_buffer.h56
-rw-r--r--sql/slave.cc1
-rw-r--r--sql/sql_circular_queue.cpp62
-rw-r--r--sql/sql_circular_queue.h37
-rw-r--r--sql/wsrep_mysqld.cc2
10 files changed, 350 insertions, 13 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 4f51a9a9c17..65b02ebd2e8 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -5740,7 +5740,7 @@ THD::binlog_start_trans_and_stmt()
Gtid_log_event gtid_event(this, this->variables.gtid_seq_no,
this->variables.gtid_domain_id,
true, LOG_EVENT_SUPPRESS_USE_F,
- true, 0);
+ true, 0, 0);
gtid_event.server_id= this->variables.server_id;
writer.write(&gtid_event);
}
@@ -6014,7 +6014,8 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
bool
MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
- bool is_transactional, uint64 commit_id)
+ bool is_transactional, uint64 commit_id,
+ uint32 transaction_length)
{
rpl_gtid gtid;
uint32 domain_id;
@@ -6078,7 +6079,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
- commit_id);
+ commit_id, transaction_length);
/* Write the event to the binary log. */
DBUG_ASSERT(this == &mysql_bin_log);
@@ -6273,7 +6274,27 @@ MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id,
seq_no);
}
-
+/*
+ Find the end_event size.
+*/
+static uint32 get_end_event_size(Log_event *ev)
+{
+ Log_event_type ev_type= ev->get_type_code();
+ if (ev_type == XID_EVENT)
+ return ev->get_data_size() + LOG_EVENT_HEADER_LEN;
+ if (ev_type == QUERY_EVENT)
+ {
+ /*
+ 2 type of data in query_log event
+ ROLLBACK and COMMIT
+ */
+ //TODO NEED to find the size
+ //it is not a constant size
+ //for the time lets return 100
+ return 100;
+ }
+ return 0;
+}
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
@@ -6394,7 +6415,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
commit_name.length);
commit_id= entry->val_int(&null_value);
});
- if (write_gtid_event(thd, true, using_trans, commit_id))
+ if (write_gtid_event(thd, true, using_trans, commit_id, 0))
goto err;
}
else
@@ -8198,19 +8219,26 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
uint64 commit_id)
{
binlog_cache_mngr *mngr= entry->cache_mngr;
+ bool using_stmt_cache= entry->using_stmt_cache && !mngr->stmt_cache.empty();
+ bool using_trx_cache= entry->using_trx_cache && !mngr->trx_cache.empty();
+ uint32 trans_size= get_end_event_size(entry->end_event);
+ if (using_stmt_cache)
+ trans_size+= my_b_bytes_in_cache(mngr->get_binlog_cache_log(FALSE));
+ if (using_trx_cache)
+ trans_size+= my_b_bytes_in_cache(mngr->get_binlog_cache_log(TRUE));
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt");
- if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id))
+ if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id, 0))
DBUG_RETURN(ER_ERROR_ON_WRITE);
- if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
+ if (using_stmt_cache &&
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
{
entry->error_cache= &mngr->stmt_cache.cache_log;
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
- if (entry->using_trx_cache && !mngr->trx_cache.empty())
+ if (using_trx_cache)
{
DBUG_EXECUTE_IF("crash_before_writing_xid",
{
diff --git a/sql/log.h b/sql/log.h
index bf1dbd30c6c..b572b4b6df7 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -874,7 +874,7 @@ public:
void set_status_variables(THD *thd);
bool is_xidlist_idle();
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional,
- uint64 commit_id);
+ uint64 commit_id, uint32 transaction_length);
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
diff --git a/sql/log_event.h b/sql/log_event.h
index 88a6e06c506..67f539d0bc3 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3382,6 +3382,9 @@ public:
uint64 seq_no;
uint64 commit_id;
uint32 domain_id;
+ uint64 transaction_size;
+ //TODO LEts keep it zero I have patch in second laptop I will use that patch
+ my_thread_id thread_id;
uchar flags2;
/* Flags2. */
@@ -3410,10 +3413,17 @@ public:
static const uchar FL_WAITED= 16;
/* FL_DDL is set for event group containing DDL. */
static const uchar FL_DDL= 32;
+ /*
+ FL_EXTRA_METADATA is for fields thread id and transaction size
+ First will be transaction size of 8bytes
+ Then thread id of 4bytes
+ */
+ static const uchar FL_EXTRA_METADATA= 64;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
- uint16 flags, bool is_transactional, uint64 commit_id);
+ uint16 flags, bool is_transactional, uint64 commit_id,
+ uint64 transaction_size);
#ifdef HAVE_REPLICATION
void pack_info(Protocol *protocol);
virtual int do_apply_event(rpl_group_info *rgi);
@@ -3430,7 +3440,9 @@ public:
enum_logged_status logged_status() { return LOGGED_NO_DATA; }
int get_data_size()
{
- return GTID_HEADER_LEN + ((flags2 & FL_GROUP_COMMIT_ID) ? 2 : 0);
+ //TODO thread id and transaction size please also take that into account
+ return GTID_HEADER_LEN + ((flags2 & FL_GROUP_COMMIT_ID) ? 2 : 0) +
+ ((flags2 & FL_EXTRA_METADATA)? 4+8 : 0);
}
bool is_valid() const { return seq_no != 0; }
#ifdef MYSQL_SERVER
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 04cf70984a2..7f117513857 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -3196,9 +3196,11 @@ bool Binlog_checkpoint_log_event::write()
Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
uint32 domain_id_arg, bool standalone,
uint16 flags_arg, bool is_transactional,
- uint64 commit_id_arg)
+ uint64 commit_id_arg,
+ uint64 transaction_size_arg = 0)
: Log_event(thd_arg, flags_arg, is_transactional),
seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg),
+ transaction_size(transaction_size_arg),
flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0))
{
cache_type= Log_event::EVENT_NO_CACHE;
diff --git a/sql/rpl_circular_buffer.cpp b/sql/rpl_circular_buffer.cpp
new file mode 100644
index 00000000000..a6a8a6d305f
--- /dev/null
+++ b/sql/rpl_circular_buffer.cpp
@@ -0,0 +1,139 @@
+#include "rpl_circular_buffer.h"
+
+
+int rpl_circular_buffer::init(MEM_ROOT* mem_root, uint64 size)
+{
+ if ((buffer= (uchar* )alloc_root(mem_root, size)))
+ return 1;
+ buffer_end= buffer + size - 1;
+ usable_free_space= size;
+ buffer_usable_ptr= buffer_end;
+ write_head= read_head= flush_head= buffer;
+ elements= 0;
+ return 0;
+}
+
+uint64 rpl_circular_buffer::empty_space()
+{
+ if (write_head == read_head && !elements)
+ return 0;
+ /*
+ Empty space for this case
+ S= Buffer_start
+ F= Flush Pointer
+ W= write pointer
+ E= buffer end
+ U= Buffer usable ptr
+ S--F----W-----E
+ Empty space= E - W + F -S
+
+ S--W-- F --- U--E
+ Empty space = F-W
+ */
+ if (write_head > flush_head)
+ return (buffer_end - write_head) + (flush_head - buffer);
+ else
+ return flush_head - write_head;
+
+}
+
+uint64 rpl_circular_buffer::write(uchar *data, buffer_granularity write_type)
+{
+ //Will take very less time because there is only one write thread.
+ //write_lock.lock()
+ // Look for usable_free_space if it is more than transaction size then we will
+ // write other wise we will wait untill we have enough space.
+ //TODO FIND transaction size.
+ uint64 trans_size= 100;
+ /*
+ TS = Transaction size/EVENT Size
+
+ if this case
+ S--F----W----E
+ if E-W > TS
+ then write data and W+= TS
+
+ if E-W < TS
+ buffer_usable_ptr= write_head
+ write_head= 0
+ It will become the second case
+
+
+ if this case
+ S--W----F--U--E
+ if TS < F -W
+ write
+ W+= TS
+ else
+ give error that buffer is full
+ and write into file
+
+ */
+ if (empty_space() < trans_size)
+ return 0;
+ if (write_head > flush_head)
+ {
+ if (buffer_end - write_head > trans_size)
+ {
+ //MEMORY_ORDER_RELEASE should be there for lock free
+ memcpy(write_head, data, trans_size);
+ write_head+= trans_size;
+ elements++;
+ return trans_size;
+ }
+ else
+ {
+ write_head= buffer;
+ if (empty_space() < trans_size)
+ return 0;
+ }
+ }
+ if ((write_head < flush_head) && ((flush_head - write_head) > trans_size))
+ {
+ //MEMORY_ORDER_RELEASE should be there for lock free
+ memcpy(write_head, data, trans_size);
+ write_head+= trans_size;
+ elements++;
+ return trans_size;
+ }
+ return 0;
+}
+
+uchar* rpl_circular_buffer::read(buffer_granularity granularity)
+{
+ uint64 trans_size= 100;
+ uchar* return_addr= NULL;
+ /*
+ Read from queue
+ lock the read mutex
+ R < W
+ S--R---W--E
+
+ R+= TS/ES
+ unlock the mutex
+ return the old read ptr
+
+ R > W
+ S--W---R--U--E
+ R += TS/ES
+ if R == U
+ R= 0
+ unlock the mutex
+ return the old ptr
+ */
+ read_lock.lock();
+ return_addr= read_head;
+ if (read_head < write_head)
+ {
+ read_head+= trans_size;
+ read_lock.unlock();
+ return return_addr;
+ }
+ else
+ {
+ read_head+= trans_size;
+ if (read_head == buffer_usable_ptr)
+ read_head= 0;
+ return return_addr;
+ }
+}
diff --git a/sql/rpl_circular_buffer.h b/sql/rpl_circular_buffer.h
new file mode 100644
index 00000000000..d6e0dac69d3
--- /dev/null
+++ b/sql/rpl_circular_buffer.h
@@ -0,0 +1,56 @@
+#ifndef RPL_CIRCULAR_BUFFER_INCLUDED
+#define RPL_CIRCULAR_BUFFER_INCLUDED
+
+#include <mutex>
+class rpl_circular_buffer
+{
+public:
+ enum buffer_granularity
+ {
+ ONE_EVENT = 1,
+ ONE_TRANSACTION,
+ };
+ int init(MEM_ROOT *mem_root, uint64 size);
+ uint64 buffer_size(){ return size;};
+ uint64 empty_space();
+ uint64 end_unused_space();
+ uchar* read(buffer_granularity read_type);
+ /*
+ It will always be written on continues memory, So suppose if we reach near
+ buffer end and we dont have enough space for next event/transaction then we
+ will write from starting (if we have already flushed the data)
+ In stort our granularity will be either full transaction or full event.
+ */
+ uint64 write(uchar* data, buffer_granularity write_type);
+ /*
+ Move the flush pointer to given ptr
+ */
+ void flush(uchar *ptr){flush_head= ptr;};
+ bool is_full()
+ {
+ if(write_head == read_head && elements)
+ return TRUE;
+ return FALSE;
+ }
+private:
+ uchar* buffer;
+ uchar* buffer_end;
+ uint64 size;
+ uint elements;
+ /*
+ Some time we can have empty space in end if transaction/event is big to fit
+ in continues space.
+ */
+ uchar* buffer_usable_ptr;
+ // Actual free space
+ uint64 usable_free_space;
+ uchar* write_head;
+ uchar* read_head;
+ uchar* flush_head;
+ std::mutex read_lock;
+ std::mutex write_lock;
+};
+
+
+
+#endif //RPL_CIRCULAR_BUFFER_INCLUDED
diff --git a/sql/slave.cc b/sql/slave.cc
index da87ab44b67..e85dacbe7d9 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -53,6 +53,7 @@
// Create_file_log_event,
// Format_description_log_event
#include "wsrep_mysqld.h"
+#include "rpl_circular_buffer.cpp"
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
#endif
diff --git a/sql/sql_circular_queue.cpp b/sql/sql_circular_queue.cpp
new file mode 100644
index 00000000000..9eb474535c1
--- /dev/null
+++ b/sql/sql_circular_queue.cpp
@@ -0,0 +1,62 @@
+#include "sql_circular_queue.h"
+#include "log_event.h"
+
+
+inline int circular_queue::init(MEM_ROOT *mem_root, size_t length)
+{
+ if ((buffer= (uchar *)alloc_root(mem_root, length)))
+ return 1;
+ buffer_end= buffer + length;
+ return 0;
+}
+
+inline uint circular_queue::next_gtid_log_event_pos(uint64 &start_position)
+{
+ uint64 event_length= start_position;
+ DBUG_ASSERT(*(buffer+start_position+EVENT_TYPE_OFFSET) ==
+ GTID_LOG_EVENT);
+ do
+ {
+ start_position+= *(buffer+start_position+EVENT_LEN_OFFSET);
+ }
+ while(*(buffer+start_position+EVENT_TYPE_OFFSET) != GTID_LOG_EVENT);
+ return start_position - event_length;
+}
+void * circular_queue::read(int64 length)
+{
+ uint64 start_position= read_ptr_cached.load(std::memory_order_relaxed);
+ if (length == READ_ONE_TRANSACTION)
+ {
+ uint transaction_end_pos;
+ while(1)
+ if (*(buffer+start_position+EVENT_TYPE_OFFSET) == GTID_LOG_EVENT)
+ {
+ do
+ transaction_end_pos= next_gtid_log_event_pos(buffer, start_position);
+ while(read_ptr_cached.compare_exchange_weak(start_position, start_position
+ + transaction_end_pos));
+ return buffer + (start_position - transaction_end_pos);
+ }
+ }
+ else if(length == READ_ONE_EVENT)
+ {
+ uint event_end_pos;
+ while(1)
+ {
+ do
+ {
+ event_end_pos= *(buffer+start_position+EVENT_LEN_OFFSET);
+ DBUG_ASSERT(event_end_pos <= buffer_read_end);
+ if (event_end_pos == buffer_read_end)
+ {
+ start_position= 0;
+ }
+ }
+ while(read_ptr_cached.compare_exchange_weak(start_position, start_position
+ + event_end_pos));
+ return buffer + (start_position - event_end_pos);
+ }
+ }
+ return NULL;
+}
+
diff --git a/sql/sql_circular_queue.h b/sql/sql_circular_queue.h
new file mode 100644
index 00000000000..dd5df310322
--- /dev/null
+++ b/sql/sql_circular_queue.h
@@ -0,0 +1,37 @@
+#ifndef SQL_CIRCULAR_QUEUE_INCLUDED
+#define SQL_CIRCULAR_QUEUE_INCLUDED
+#include <atomic>
+
+#define READ_ONE_EVENT 0
+#define READ_ONE_TRANSACTION -1
+
+
+/*
+ In memory lock free circular queue
+*/
+
+class circular_queue
+{
+public:
+ int init(MEM_ROOT *mem_root, size_t length);
+ void * read(int64 length);
+ size_t write(const void* data, size_t length);
+ int reset_queue();
+ int delete_queue();
+private:
+ uchar *buffer;
+ uchar *buffer_end;
+ uint next_gtid_event_len_pos(uint64 &start_position);
+ char pad1[CPU_LEVEL1_DCACHE_LINESIZE];
+ std::atomic <uint64> read_ptr_cached;
+ char pad2[CPU_LEVEL1_DCACHE_LINESIZE];
+ std::atomic <uint64> read_ptr_flush;
+ char pad3[CPU_LEVEL1_DCACHE_LINESIZE];
+ std::atomic <uint64> write_ptr_cached;
+ char pad4[CPU_LEVEL1_DCACHE_LINESIZE];
+ std::atomic <uint64> write_ptr;
+ char pad5[CPU_LEVEL1_DCACHE_LINESIZE];
+};
+
+
+#endif /* SQL_CIRCULAR_QUEUE_INCLUDED */
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index a6f9d3a6b17..cb9a923ef31 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -1429,7 +1429,7 @@ int wsrep_to_buf_helper(
Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no,
thd->variables.gtid_domain_id,
true, LOG_EVENT_SUPPRESS_USE_F,
- true, 0);
+ true, 0, 0);
gtid_event.server_id= thd->variables.server_id;
if (!gtid_event.is_valid()) ret= 0;
ret= writer.write(&gtid_event);