summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSachin <A00279809@student.ait.ie>2020-10-24 13:37:53 +0100
committerSachin <A00279809@student.ait.ie>2020-10-24 13:37:53 +0100
commite3275963bddbcca7c5f99dbed27275b628e90deb (patch)
tree77261872a78131dc39651442728356d9c2664cb0
parentb4fb15ccd4f2864483f8644c0236e63c814c8beb (diff)
downloadmariadb-git-e3275963bddbcca7c5f99dbed27275b628e90deb.tar.gz
Something works
-rw-r--r--sql/queue.h86
-rw-r--r--sql/rpl_queue.h97
-rw-r--r--sql/slave.cc1
-rw-r--r--unittest/sql/CMakeLists.txt8
-rw-r--r--unittest/sql/rpl_queue-t.cc59
5 files changed, 251 insertions, 0 deletions
diff --git a/sql/queue.h b/sql/queue.h
new file mode 100644
index 00000000000..195b2e74346
--- /dev/null
+++ b/sql/queue.h
@@ -0,0 +1,86 @@
+#ifndef QUEUE_H
+#define QUEUE_H
+
+#include "my_global.h"
+#include "my_base.h"
+#include "my_pthread.h"
+#include "mysql/psi/mysql_thread.h"
+#include <cstring>
+using namespace::std;
+#define UNUSED_SPACE 0xFF
+
+
+/*
+ We will use N-1 to check whether buffer is full or not.
+ Comments
+ # Free Space
+ * Filled Space
+ H Head
+ T Tail
+*/
+template <typename Element_type>
+class circular_buffer_queue
+{
+ public:
+ uchar *buffer, *buffer_end;
+ //Total no of events currently in queue
+ ulong events;
+ ulong buffer_size;
+ mysql_mutex_t lock_queue;
+ ulong free_size()
+ {
+ if (head > tail)
+ return buffer_size - (head-tail)-1;
+ if (tail > head)
+ return tail-head-1;
+ return buffer_size - 1;
+ }
+ ulong used_buffer()
+ {
+ return buffer_size - free_size() -1;
+ }
+ uchar *head, *tail;
+ circular_buffer_queue(){};
+
+ int init(ulong buffer_size)
+ {
+ if (!(buffer= (uchar*)my_malloc(PSI_INSTRUMENT_ME, buffer_size,
+ MYF(MY_THREAD_SPECIFIC|MY_WME))))
+ return 1;
+ this->buffer_size= buffer_size;
+ buffer_end= buffer + buffer_size;
+ head= tail= buffer;
+ mysql_mutex_init(0, &lock_queue, MY_MUTEX_INIT_SLOW);
+ return 0;
+ }
+
+ /*
+ We want to write in continues memory.
+ */
+ int enqueue(Element_type *elem)
+ {
+ uint32 length= elem->total_length;
+ if (free_size() < length)
+ return 1;
+ mysql_mutex_lock(&lock_queue);
+ head= elem->write(head, buffer, buffer_end);
+ mysql_mutex_unlock(&lock_queue);
+ return 0;
+ };
+
+ Element_type *dequeue()
+ {
+ if (used_buffer() > 0)
+ {
+ mysql_mutex_lock(&lock_queue);
+ Element_type *el= new Element_type(tail, buffer, buffer_end);
+ tail= el->tail;
+ mysql_mutex_unlock(&lock_queue);
+ return el;
+ }
+ return NULL;
+ }
+};
+
+
+#endif /* QUEUE_H */
diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h
new file mode 100644
index 00000000000..d16efd276e6
--- /dev/null
+++ b/sql/rpl_queue.h
@@ -0,0 +1,97 @@
+#ifndef RPL_QUEUE_H
+#define RPL_QUEUE_H
+#include "my_global.h"
+#include "queue.h"
+#include <cmath>
+#include <cstdlib>
+#include <cstring>
+#define EVENT_LEN_OFFSET 9
+class slave_queue_element
+{
+ public:
+ uchar *event, *tail;
+ bool malloced;
+ /*
+ Control flags will only in put when this event is start of new transaction
+ */
+ uchar flags;
+ //event_length + flags(1 byte)
+ uint total_length;
+ slave_queue_element(uchar* ptr, uchar* buffer_start, uchar* buffer_end)
+ {
+ //READ the EVENT_LENGTH;
+ uchar len[4];
+ ulong size= buffer_end - buffer_start;
+ ulong ptr_numeric= ptr - buffer_start;
+ ulong ev_length_start= (ptr_numeric + EVENT_LEN_OFFSET) %size;
+ ulong ev_length_end= (ptr_numeric + EVENT_LEN_OFFSET + 4) %size;
+ //EVENT_LEN_OFFSET is in continues memory
+ if( ev_length_start< ev_length_end)
+ {
+ total_length= uint4korr(buffer_start + ev_length_start);
+ }
+ else
+ {
+ int remainder= ev_length_end;
+ memcpy(len, ptr+ ev_length_start, 4 - remainder);
+ memcpy(len+4-remainder, buffer_start, remainder);
+ total_length= uint4korr(len);
+ }
+ //Event in continues memory chunk.
+ if (ptr_numeric < (ptr_numeric+total_length) % size)
+ {
+ event= ptr;
+ malloced= false;
+ tail= event+ total_length;
+ }
+ else
+ {
+ //malloc and memcpy to continues memory chunk
+ malloced= true;
+ //QTODO
+ event= (uchar *)my_malloc(0, total_length, MYF(MY_WME));
+ int remainder= (ptr_numeric + total_length) % size;
+ memcpy(event, ptr, total_length - remainder);
+ memcpy(event+total_length - remainder, buffer_start, remainder);
+ tail= buffer_start + remainder;
+ }
+ }
+ slave_queue_element(uchar *ev)
+ {
+ malloced= false;
+ total_length= uint4korr(ev+EVENT_LEN_OFFSET);
+ event= ev;
+ }
+ //We need to wrap arround in case we overstoot buffer end.
+ uchar* write(uchar *ptr, uchar *buffer_start, uchar * buffer_end)
+ {
+ uint32 t_len= MY_MIN(total_length, buffer_end - ptr);
+ memcpy(ptr, event, t_len);
+ if (t_len < total_length)
+ {
+ memcpy(ptr, event, t_len);
+ memcpy(buffer_start, event + t_len, total_length - t_len);
+ return buffer_start + total_length - t_len;
+ }
+ //NO wrapping needed
+ else
+ {
+ memcpy(ptr, event, total_length);
+ return ptr + total_length;
+ }
+ }
+ ~slave_queue_element()
+ {
+ if (malloced)
+ free(event);
+ }
+
+ //QTODO
+ //Control flags
+ uchar NEW_TRANSACTION= 1;
+ uchar RELAY_LOGGED= 2;
+ uchar COMMITTED= 4;
+ // Reserved for use by the circular queue
+ // UNUSED_SPACE = 0xFF
+};
+#endif //RPL_QUEUE_H
diff --git a/sql/slave.cc b/sql/slave.cc
index cc50638ae5b..b0907cae5e5 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -36,6 +36,7 @@
#include "rpl_filter.h"
#include "repl_failsafe.h"
#include "transaction.h"
+#include "rpl_queue.h"
#include <thr_alarm.h>
#include <my_dir.h>
#include <sql_common.h>
diff --git a/unittest/sql/CMakeLists.txt b/unittest/sql/CMakeLists.txt
index b7923511b97..419fdecf0b1 100644
--- a/unittest/sql/CMakeLists.txt
+++ b/unittest/sql/CMakeLists.txt
@@ -29,3 +29,11 @@ ADD_EXECUTABLE(mf_iocache-t mf_iocache-t.cc ../../sql/mf_iocache_encr.cc)
TARGET_LINK_LIBRARIES(mf_iocache-t mysys mytap mysys_ssl)
ADD_DEPENDENCIES(mf_iocache-t GenError)
MY_ADD_TEST(mf_iocache)
+
+INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/sql
+ ${CMAKE_SOURCE_DIR}/include
+ ${CMAKE_SOURCE_DIR}/unittest/mytap
+ ${CMAKE_SOURCE_DIR}/extra/yassl/include)
+ADD_EXECUTABLE(rpl_queue-t rpl_queue-t.cc )
+TARGET_LINK_LIBRARIES(rpl_queue-t sql mytap)
+MY_ADD_TEST(rpl_queue-t)
diff --git a/unittest/sql/rpl_queue-t.cc b/unittest/sql/rpl_queue-t.cc
new file mode 100644
index 00000000000..0f0a2ba2b24
--- /dev/null
+++ b/unittest/sql/rpl_queue-t.cc
@@ -0,0 +1,59 @@
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <tap.h>
+#include <my_global.h>
+#include <queue.h>
+#include <rpl_queue.h>
+
+uchar* create_dummy_event(char c, int size)
+{
+ uchar *data= (uchar *)malloc(size);
+ memset(data, c, size);
+ int4store(data+EVENT_LEN_OFFSET, size);
+ return data;
+}
+
+
+class dummy_queue:public circular_buffer_queue<slave_queue_element>
+{
+
+};
+int main(int argc __attribute__((unused)),char *argv[])
+{
+ dummy_queue *queue = new dummy_queue();
+ int counter= 0;
+ queue->init(80);
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('A', 25)))? 0 : 1;
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('B', 26)))? 0 : 1;
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('C', 25)))? 0 : 1;
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('D', 25)))? 0 : 1;
+ for(int i = 0; i < counter; i++)
+ {
+ slave_queue_element *el= queue->dequeue();
+ if (el)
+ {
+ fwrite(el->event, sizeof(char), el->total_length, stdout);
+ el->~slave_queue_element();
+ }
+ }
+
+ counter= 0;
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('A', 25)))? 0 : 1;
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('B', 26)))? 0 : 1;
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('C', 25)))? 0 : 1;
+ queue->dequeue();
+ counter += queue->enqueue(new slave_queue_element(create_dummy_event('D', 25)))? 0 : 1;
+ for(int i = 0; i < counter; i++)
+ {
+ slave_queue_element *el= queue->dequeue();
+ if (el)
+ {
+ fwrite(el->event, sizeof(char), el->total_length, stdout);
+ el->~slave_queue_element();
+ }
+ }
+ plan(1);
+ ok(true," ");
+ return exit_status();
+}