From e3275963bddbcca7c5f99dbed27275b628e90deb Mon Sep 17 00:00:00 2001 From: Sachin Date: Sat, 24 Oct 2020 13:37:53 +0100 Subject: Something works --- sql/queue.h | 86 ++++++++++++++++++++++++++++++++++++++++ sql/rpl_queue.h | 97 +++++++++++++++++++++++++++++++++++++++++++++ sql/slave.cc | 1 + unittest/sql/CMakeLists.txt | 8 ++++ unittest/sql/rpl_queue-t.cc | 59 +++++++++++++++++++++++++++ 5 files changed, 251 insertions(+) create mode 100644 sql/queue.h create mode 100644 sql/rpl_queue.h create mode 100644 unittest/sql/rpl_queue-t.cc diff --git a/sql/queue.h b/sql/queue.h new file mode 100644 index 00000000000..195b2e74346 --- /dev/null +++ b/sql/queue.h @@ -0,0 +1,86 @@ +#ifndef QUEUE_H +#define QUEUE_H + +#include "my_global.h" +#include "my_base.h" +#include "my_pthread.h" +#include "mysql/psi/mysql_thread.h" +#include +using namespace::std; +#define UNUSED_SPACE 0xFF + + +/* + We will use N-1 to check whether buffer is full or not. + Comments + # Free Space + * Filled Space + H Head + T Tail +*/ +template +class circular_buffer_queue +{ + public: + uchar *buffer, *buffer_end; + //Total no of events currently in queue + ulong events; + ulong buffer_size; + mysql_mutex_t lock_queue; + ulong free_size() + { + if (head > tail) + return buffer_size - (head-tail)-1; + if (tail > head) + return tail-head-1; + return buffer_size - 1; + } + ulong used_buffer() + { + return buffer_size - free_size() -1; + } + uchar *head, *tail; + circular_buffer_queue(){}; + + int init(ulong buffer_size) + { + if (!(buffer= (uchar*)my_malloc(PSI_INSTRUMENT_ME, buffer_size, + MYF(MY_THREAD_SPECIFIC|MY_WME)))) + return 1; + this->buffer_size= buffer_size; + buffer_end= buffer + buffer_size; + head= tail= buffer; + mysql_mutex_init(0, &lock_queue, MY_MUTEX_INIT_SLOW); + return 0; + } + + /* + We want to write in continues memory. + */ + int enqueue(Element_type *elem) + { + uint32 length= elem->total_length; + if (free_size() < length) + return 1; + mysql_mutex_lock(&lock_queue); + head= elem->write(head, buffer, buffer_end); + mysql_mutex_unlock(&lock_queue); + return 0; + }; + + Element_type *dequeue() + { + if (used_buffer() > 0) + { + mysql_mutex_lock(&lock_queue); + Element_type *el= new Element_type(tail, buffer, buffer_end); + tail= el->tail; + mysql_mutex_unlock(&lock_queue); + return el; + } + return NULL; + } +}; + + +#endif /* QUEUE_H */ diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h new file mode 100644 index 00000000000..d16efd276e6 --- /dev/null +++ b/sql/rpl_queue.h @@ -0,0 +1,97 @@ +#ifndef RPL_QUEUE_H +#define RPL_QUEUE_H +#include "my_global.h" +#include "queue.h" +#include +#include +#include +#define EVENT_LEN_OFFSET 9 +class slave_queue_element +{ + public: + uchar *event, *tail; + bool malloced; + /* + Control flags will only in put when this event is start of new transaction + */ + uchar flags; + //event_length + flags(1 byte) + uint total_length; + slave_queue_element(uchar* ptr, uchar* buffer_start, uchar* buffer_end) + { + //READ the EVENT_LENGTH; + uchar len[4]; + ulong size= buffer_end - buffer_start; + ulong ptr_numeric= ptr - buffer_start; + ulong ev_length_start= (ptr_numeric + EVENT_LEN_OFFSET) %size; + ulong ev_length_end= (ptr_numeric + EVENT_LEN_OFFSET + 4) %size; + //EVENT_LEN_OFFSET is in continues memory + if( ev_length_start< ev_length_end) + { + total_length= uint4korr(buffer_start + ev_length_start); + } + else + { + int remainder= ev_length_end; + memcpy(len, ptr+ ev_length_start, 4 - remainder); + memcpy(len+4-remainder, buffer_start, remainder); + total_length= uint4korr(len); + } + //Event in continues memory chunk. + if (ptr_numeric < (ptr_numeric+total_length) % size) + { + event= ptr; + malloced= false; + tail= event+ total_length; + } + else + { + //malloc and memcpy to continues memory chunk + malloced= true; + //QTODO + event= (uchar *)my_malloc(0, total_length, MYF(MY_WME)); + int remainder= (ptr_numeric + total_length) % size; + memcpy(event, ptr, total_length - remainder); + memcpy(event+total_length - remainder, buffer_start, remainder); + tail= buffer_start + remainder; + } + } + slave_queue_element(uchar *ev) + { + malloced= false; + total_length= uint4korr(ev+EVENT_LEN_OFFSET); + event= ev; + } + //We need to wrap arround in case we overstoot buffer end. + uchar* write(uchar *ptr, uchar *buffer_start, uchar * buffer_end) + { + uint32 t_len= MY_MIN(total_length, buffer_end - ptr); + memcpy(ptr, event, t_len); + if (t_len < total_length) + { + memcpy(ptr, event, t_len); + memcpy(buffer_start, event + t_len, total_length - t_len); + return buffer_start + total_length - t_len; + } + //NO wrapping needed + else + { + memcpy(ptr, event, total_length); + return ptr + total_length; + } + } + ~slave_queue_element() + { + if (malloced) + free(event); + } + + //QTODO + //Control flags + uchar NEW_TRANSACTION= 1; + uchar RELAY_LOGGED= 2; + uchar COMMITTED= 4; + // Reserved for use by the circular queue + // UNUSED_SPACE = 0xFF +}; +#endif //RPL_QUEUE_H diff --git a/sql/slave.cc b/sql/slave.cc index cc50638ae5b..b0907cae5e5 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -36,6 +36,7 @@ #include "rpl_filter.h" #include "repl_failsafe.h" #include "transaction.h" +#include "rpl_queue.h" #include #include #include diff --git a/unittest/sql/CMakeLists.txt b/unittest/sql/CMakeLists.txt index b7923511b97..419fdecf0b1 100644 --- a/unittest/sql/CMakeLists.txt +++ b/unittest/sql/CMakeLists.txt @@ -29,3 +29,11 @@ ADD_EXECUTABLE(mf_iocache-t mf_iocache-t.cc ../../sql/mf_iocache_encr.cc) TARGET_LINK_LIBRARIES(mf_iocache-t mysys mytap mysys_ssl) ADD_DEPENDENCIES(mf_iocache-t GenError) MY_ADD_TEST(mf_iocache) + +INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/sql + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/unittest/mytap + ${CMAKE_SOURCE_DIR}/extra/yassl/include) +ADD_EXECUTABLE(rpl_queue-t rpl_queue-t.cc ) +TARGET_LINK_LIBRARIES(rpl_queue-t sql mytap) +MY_ADD_TEST(rpl_queue-t) diff --git a/unittest/sql/rpl_queue-t.cc b/unittest/sql/rpl_queue-t.cc new file mode 100644 index 00000000000..0f0a2ba2b24 --- /dev/null +++ b/unittest/sql/rpl_queue-t.cc @@ -0,0 +1,59 @@ +#include +#include +#include +#include +#include +#include +#include + +uchar* create_dummy_event(char c, int size) +{ + uchar *data= (uchar *)malloc(size); + memset(data, c, size); + int4store(data+EVENT_LEN_OFFSET, size); + return data; +} + + +class dummy_queue:public circular_buffer_queue +{ + +}; +int main(int argc __attribute__((unused)),char *argv[]) +{ + dummy_queue *queue = new dummy_queue(); + int counter= 0; + queue->init(80); + counter += queue->enqueue(new slave_queue_element(create_dummy_event('A', 25)))? 0 : 1; + counter += queue->enqueue(new slave_queue_element(create_dummy_event('B', 26)))? 0 : 1; + counter += queue->enqueue(new slave_queue_element(create_dummy_event('C', 25)))? 0 : 1; + counter += queue->enqueue(new slave_queue_element(create_dummy_event('D', 25)))? 0 : 1; + for(int i = 0; i < counter; i++) + { + slave_queue_element *el= queue->dequeue(); + if (el) + { + fwrite(el->event, sizeof(char), el->total_length, stdout); + el->~slave_queue_element(); + } + } + + counter= 0; + counter += queue->enqueue(new slave_queue_element(create_dummy_event('A', 25)))? 0 : 1; + counter += queue->enqueue(new slave_queue_element(create_dummy_event('B', 26)))? 0 : 1; + counter += queue->enqueue(new slave_queue_element(create_dummy_event('C', 25)))? 0 : 1; + queue->dequeue(); + counter += queue->enqueue(new slave_queue_element(create_dummy_event('D', 25)))? 0 : 1; + for(int i = 0; i < counter; i++) + { + slave_queue_element *el= queue->dequeue(); + if (el) + { + fwrite(el->event, sizeof(char), el->total_length, stdout); + el->~slave_queue_element(); + } + } + plan(1); + ok(true," "); + return exit_status(); +} -- cgit v1.2.1