diff options
-rw-r--r-- | sql/log.cc | 44 | ||||
-rw-r--r-- | sql/log.h | 2 | ||||
-rw-r--r-- | sql/log_event.h | 16 | ||||
-rw-r--r-- | sql/log_event_server.cc | 4 | ||||
-rw-r--r-- | sql/rpl_circular_buffer.cpp | 139 | ||||
-rw-r--r-- | sql/rpl_circular_buffer.h | 56 | ||||
-rw-r--r-- | sql/slave.cc | 1 | ||||
-rw-r--r-- | sql/sql_circular_queue.cpp | 62 | ||||
-rw-r--r-- | sql/sql_circular_queue.h | 37 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 2 |
10 files changed, 350 insertions, 13 deletions
diff --git a/sql/log.cc b/sql/log.cc index 4f51a9a9c17..65b02ebd2e8 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -5740,7 +5740,7 @@ THD::binlog_start_trans_and_stmt() Gtid_log_event gtid_event(this, this->variables.gtid_seq_no, this->variables.gtid_domain_id, true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); + true, 0, 0); gtid_event.server_id= this->variables.server_id; writer.write(>id_event); } @@ -6014,7 +6014,8 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, bool MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, - bool is_transactional, uint64 commit_id) + bool is_transactional, uint64 commit_id, + uint32 transaction_length) { rpl_gtid gtid; uint32 domain_id; @@ -6078,7 +6079,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, LOG_EVENT_SUPPRESS_USE_F, is_transactional, - commit_id); + commit_id, transaction_length); /* Write the event to the binary log. */ DBUG_ASSERT(this == &mysql_bin_log); @@ -6273,7 +6274,27 @@ MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, seq_no); } - +/* + Find the end_event size. +*/ +static uint32 get_end_event_size(Log_event *ev) +{ + Log_event_type ev_type= ev->get_type_code(); + if (ev_type == XID_EVENT) + return ev->get_data_size() + LOG_EVENT_HEADER_LEN; + if (ev_type == QUERY_EVENT) + { + /* + 2 type of data in query_log event + ROLLBACK and COMMIT + */ + //TODO NEED to find the size + //it is not a constant size + //for the time lets return 100 + return 100; + } + return 0; +} /** Write an event to the binary log. If with_annotate != NULL and *with_annotate = TRUE write also Annotate_rows before the event @@ -6394,7 +6415,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) commit_name.length); commit_id= entry->val_int(&null_value); }); - if (write_gtid_event(thd, true, using_trans, commit_id)) + if (write_gtid_event(thd, true, using_trans, commit_id, 0)) goto err; } else @@ -8198,19 +8219,26 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id) { binlog_cache_mngr *mngr= entry->cache_mngr; + bool using_stmt_cache= entry->using_stmt_cache && !mngr->stmt_cache.empty(); + bool using_trx_cache= entry->using_trx_cache && !mngr->trx_cache.empty(); + uint32 trans_size= get_end_event_size(entry->end_event); + if (using_stmt_cache) + trans_size+= my_b_bytes_in_cache(mngr->get_binlog_cache_log(FALSE)); + if (using_trx_cache) + trans_size+= my_b_bytes_in_cache(mngr->get_binlog_cache_log(TRUE)); DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt"); - if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) + if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id, 0)) DBUG_RETURN(ER_ERROR_ON_WRITE); - if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && + if (using_stmt_cache && write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) { entry->error_cache= &mngr->stmt_cache.cache_log; DBUG_RETURN(ER_ERROR_ON_WRITE); } - if (entry->using_trx_cache && !mngr->trx_cache.empty()) + if (using_trx_cache) { DBUG_EXECUTE_IF("crash_before_writing_xid", { diff --git a/sql/log.h b/sql/log.h index bf1dbd30c6c..b572b4b6df7 100644 --- a/sql/log.h +++ b/sql/log.h @@ -874,7 +874,7 @@ public: void set_status_variables(THD *thd); bool is_xidlist_idle(); bool write_gtid_event(THD *thd, bool standalone, bool is_transactional, - uint64 commit_id); + uint64 commit_id, uint32 transaction_length); int read_state_from_file(); int write_state_to_file(); int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); diff --git a/sql/log_event.h b/sql/log_event.h index 88a6e06c506..67f539d0bc3 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -3382,6 +3382,9 @@ public: uint64 seq_no; uint64 commit_id; uint32 domain_id; + uint64 transaction_size; + //TODO LEts keep it zero I have patch in second laptop I will use that patch + my_thread_id thread_id; uchar flags2; /* Flags2. */ @@ -3410,10 +3413,17 @@ public: static const uchar FL_WAITED= 16; /* FL_DDL is set for event group containing DDL. */ static const uchar FL_DDL= 32; + /* + FL_EXTRA_METADATA is for fields thread id and transaction size + First will be transaction size of 8bytes + Then thread id of 4bytes + */ + static const uchar FL_EXTRA_METADATA= 64; #ifdef MYSQL_SERVER Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, - uint16 flags, bool is_transactional, uint64 commit_id); + uint16 flags, bool is_transactional, uint64 commit_id, + uint64 transaction_size); #ifdef HAVE_REPLICATION void pack_info(Protocol *protocol); virtual int do_apply_event(rpl_group_info *rgi); @@ -3430,7 +3440,9 @@ public: enum_logged_status logged_status() { return LOGGED_NO_DATA; } int get_data_size() { - return GTID_HEADER_LEN + ((flags2 & FL_GROUP_COMMIT_ID) ? 2 : 0); + //TODO thread id and transaction size please also take that into account + return GTID_HEADER_LEN + ((flags2 & FL_GROUP_COMMIT_ID) ? 2 : 0) + + ((flags2 & FL_EXTRA_METADATA)? 4+8 : 0); } bool is_valid() const { return seq_no != 0; } #ifdef MYSQL_SERVER diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 04cf70984a2..7f117513857 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -3196,9 +3196,11 @@ bool Binlog_checkpoint_log_event::write() Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, uint32 domain_id_arg, bool standalone, uint16 flags_arg, bool is_transactional, - uint64 commit_id_arg) + uint64 commit_id_arg, + uint64 transaction_size_arg = 0) : Log_event(thd_arg, flags_arg, is_transactional), seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg), + transaction_size(transaction_size_arg), flags2((standalone ? FL_STANDALONE : 0) | (commit_id_arg ? FL_GROUP_COMMIT_ID : 0)) { cache_type= Log_event::EVENT_NO_CACHE; diff --git a/sql/rpl_circular_buffer.cpp b/sql/rpl_circular_buffer.cpp new file mode 100644 index 00000000000..a6a8a6d305f --- /dev/null +++ b/sql/rpl_circular_buffer.cpp @@ -0,0 +1,139 @@ +#include "rpl_circular_buffer.h" + + +int rpl_circular_buffer::init(MEM_ROOT* mem_root, uint64 size) +{ + if ((buffer= (uchar* )alloc_root(mem_root, size))) + return 1; + buffer_end= buffer + size - 1; + usable_free_space= size; + buffer_usable_ptr= buffer_end; + write_head= read_head= flush_head= buffer; + elements= 0; + return 0; +} + +uint64 rpl_circular_buffer::empty_space() +{ + if (write_head == read_head && !elements) + return 0; + /* + Empty space for this case + S= Buffer_start + F= Flush Pointer + W= write pointer + E= buffer end + U= Buffer usable ptr + S--F----W-----E + Empty space= E - W + F -S + + S--W-- F --- U--E + Empty space = F-W + */ + if (write_head > flush_head) + return (buffer_end - write_head) + (flush_head - buffer); + else + return flush_head - write_head; + +} + +uint64 rpl_circular_buffer::write(uchar *data, buffer_granularity write_type) +{ + //Will take very less time because there is only one write thread. + //write_lock.lock() + // Look for usable_free_space if it is more than transaction size then we will + // write other wise we will wait untill we have enough space. + //TODO FIND transaction size. + uint64 trans_size= 100; + /* + TS = Transaction size/EVENT Size + + if this case + S--F----W----E + if E-W > TS + then write data and W+= TS + + if E-W < TS + buffer_usable_ptr= write_head + write_head= 0 + It will become the second case + + + if this case + S--W----F--U--E + if TS < F -W + write + W+= TS + else + give error that buffer is full + and write into file + + */ + if (empty_space() < trans_size) + return 0; + if (write_head > flush_head) + { + if (buffer_end - write_head > trans_size) + { + //MEMORY_ORDER_RELEASE should be there for lock free + memcpy(write_head, data, trans_size); + write_head+= trans_size; + elements++; + return trans_size; + } + else + { + write_head= buffer; + if (empty_space() < trans_size) + return 0; + } + } + if ((write_head < flush_head) && ((flush_head - write_head) > trans_size)) + { + //MEMORY_ORDER_RELEASE should be there for lock free + memcpy(write_head, data, trans_size); + write_head+= trans_size; + elements++; + return trans_size; + } + return 0; +} + +uchar* rpl_circular_buffer::read(buffer_granularity granularity) +{ + uint64 trans_size= 100; + uchar* return_addr= NULL; + /* + Read from queue + lock the read mutex + R < W + S--R---W--E + + R+= TS/ES + unlock the mutex + return the old read ptr + + R > W + S--W---R--U--E + R += TS/ES + if R == U + R= 0 + unlock the mutex + return the old ptr + */ + read_lock.lock(); + return_addr= read_head; + if (read_head < write_head) + { + read_head+= trans_size; + read_lock.unlock(); + return return_addr; + } + else + { + read_head+= trans_size; + if (read_head == buffer_usable_ptr) + read_head= 0; + return return_addr; + } +} diff --git a/sql/rpl_circular_buffer.h b/sql/rpl_circular_buffer.h new file mode 100644 index 00000000000..d6e0dac69d3 --- /dev/null +++ b/sql/rpl_circular_buffer.h @@ -0,0 +1,56 @@ +#ifndef RPL_CIRCULAR_BUFFER_INCLUDED +#define RPL_CIRCULAR_BUFFER_INCLUDED + +#include <mutex> +class rpl_circular_buffer +{ +public: + enum buffer_granularity + { + ONE_EVENT = 1, + ONE_TRANSACTION, + }; + int init(MEM_ROOT *mem_root, uint64 size); + uint64 buffer_size(){ return size;}; + uint64 empty_space(); + uint64 end_unused_space(); + uchar* read(buffer_granularity read_type); + /* + It will always be written on continues memory, So suppose if we reach near + buffer end and we dont have enough space for next event/transaction then we + will write from starting (if we have already flushed the data) + In stort our granularity will be either full transaction or full event. + */ + uint64 write(uchar* data, buffer_granularity write_type); + /* + Move the flush pointer to given ptr + */ + void flush(uchar *ptr){flush_head= ptr;}; + bool is_full() + { + if(write_head == read_head && elements) + return TRUE; + return FALSE; + } +private: + uchar* buffer; + uchar* buffer_end; + uint64 size; + uint elements; + /* + Some time we can have empty space in end if transaction/event is big to fit + in continues space. + */ + uchar* buffer_usable_ptr; + // Actual free space + uint64 usable_free_space; + uchar* write_head; + uchar* read_head; + uchar* flush_head; + std::mutex read_lock; + std::mutex write_lock; +}; + + + +#endif //RPL_CIRCULAR_BUFFER_INCLUDED diff --git a/sql/slave.cc b/sql/slave.cc index da87ab44b67..e85dacbe7d9 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -53,6 +53,7 @@ // Create_file_log_event, // Format_description_log_event #include "wsrep_mysqld.h" +#include "rpl_circular_buffer.cpp" #ifdef WITH_WSREP #include "wsrep_trans_observer.h" #endif diff --git a/sql/sql_circular_queue.cpp b/sql/sql_circular_queue.cpp new file mode 100644 index 00000000000..9eb474535c1 --- /dev/null +++ b/sql/sql_circular_queue.cpp @@ -0,0 +1,62 @@ +#include "sql_circular_queue.h" +#include "log_event.h" + + +inline int circular_queue::init(MEM_ROOT *mem_root, size_t length) +{ + if ((buffer= (uchar *)alloc_root(mem_root, length))) + return 1; + buffer_end= buffer + length; + return 0; +} + +inline uint circular_queue::next_gtid_log_event_pos(uint64 &start_position) +{ + uint64 event_length= start_position; + DBUG_ASSERT(*(buffer+start_position+EVENT_TYPE_OFFSET) == + GTID_LOG_EVENT); + do + { + start_position+= *(buffer+start_position+EVENT_LEN_OFFSET); + } + while(*(buffer+start_position+EVENT_TYPE_OFFSET) != GTID_LOG_EVENT); + return start_position - event_length; +} +void * circular_queue::read(int64 length) +{ + uint64 start_position= read_ptr_cached.load(std::memory_order_relaxed); + if (length == READ_ONE_TRANSACTION) + { + uint transaction_end_pos; + while(1) + if (*(buffer+start_position+EVENT_TYPE_OFFSET) == GTID_LOG_EVENT) + { + do + transaction_end_pos= next_gtid_log_event_pos(buffer, start_position); + while(read_ptr_cached.compare_exchange_weak(start_position, start_position + + transaction_end_pos)); + return buffer + (start_position - transaction_end_pos); + } + } + else if(length == READ_ONE_EVENT) + { + uint event_end_pos; + while(1) + { + do + { + event_end_pos= *(buffer+start_position+EVENT_LEN_OFFSET); + DBUG_ASSERT(event_end_pos <= buffer_read_end); + if (event_end_pos == buffer_read_end) + { + start_position= 0; + } + } + while(read_ptr_cached.compare_exchange_weak(start_position, start_position + + event_end_pos)); + return buffer + (start_position - event_end_pos); + } + } + return NULL; +} + diff --git a/sql/sql_circular_queue.h b/sql/sql_circular_queue.h new file mode 100644 index 00000000000..dd5df310322 --- /dev/null +++ b/sql/sql_circular_queue.h @@ -0,0 +1,37 @@ +#ifndef SQL_CIRCULAR_QUEUE_INCLUDED +#define SQL_CIRCULAR_QUEUE_INCLUDED +#include <atomic> + +#define READ_ONE_EVENT 0 +#define READ_ONE_TRANSACTION -1 + + +/* + In memory lock free circular queue +*/ + +class circular_queue +{ +public: + int init(MEM_ROOT *mem_root, size_t length); + void * read(int64 length); + size_t write(const void* data, size_t length); + int reset_queue(); + int delete_queue(); +private: + uchar *buffer; + uchar *buffer_end; + uint next_gtid_event_len_pos(uint64 &start_position); + char pad1[CPU_LEVEL1_DCACHE_LINESIZE]; + std::atomic <uint64> read_ptr_cached; + char pad2[CPU_LEVEL1_DCACHE_LINESIZE]; + std::atomic <uint64> read_ptr_flush; + char pad3[CPU_LEVEL1_DCACHE_LINESIZE]; + std::atomic <uint64> write_ptr_cached; + char pad4[CPU_LEVEL1_DCACHE_LINESIZE]; + std::atomic <uint64> write_ptr; + char pad5[CPU_LEVEL1_DCACHE_LINESIZE]; +}; + + +#endif /* SQL_CIRCULAR_QUEUE_INCLUDED */ diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index a6f9d3a6b17..cb9a923ef31 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1429,7 +1429,7 @@ int wsrep_to_buf_helper( Gtid_log_event gtid_event(thd, thd->variables.gtid_seq_no, thd->variables.gtid_domain_id, true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); + true, 0, 0); gtid_event.server_id= thd->variables.server_id; if (!gtid_event.is_valid()) ret= 0; ret= writer.write(>id_event); |