summaryrefslogtreecommitdiff
path: root/io_cache_test/ring_buffer.hpp
diff options
context:
space:
mode:
authorNikita Malyavin <nikitamalyavin@gmail.com>2021-08-22 05:51:01 +0300
committerNikita Malyavin <nikitamalyavin@gmail.com>2021-08-27 12:20:02 +1000
commit47bd795b35c0e6b259dc16521966232cbdd7c190 (patch)
treefb223111cf12b162720c35909af052d75fbb6a7d /io_cache_test/ring_buffer.hpp
parent98ab2af187c79f8a05421b2e252583dfc8850265 (diff)
downloadmariadb-git-MDEV-24676_cpp.tar.gz
fix wraparound race conditionMDEV-24676_cpp
also improve locking granularity regarding flush remove atomic on bool, since it is already protected under lock add more volatile specs add comments, improve structure packing
Diffstat (limited to 'io_cache_test/ring_buffer.hpp')
-rw-r--r--io_cache_test/ring_buffer.hpp111
1 files changed, 73 insertions, 38 deletions
diff --git a/io_cache_test/ring_buffer.hpp b/io_cache_test/ring_buffer.hpp
index 9566c6132c5..7ca8fc313ec 100644
--- a/io_cache_test/ring_buffer.hpp
+++ b/io_cache_test/ring_buffer.hpp
@@ -3,6 +3,7 @@
#include <mysys_priv.h>
#include <semaphore.h>
#include <atomic>
+#include <array>
class RingBuffer {
public:
@@ -14,15 +15,15 @@ public:
private:
struct cache_slot_t {
- std::atomic<bool> vacant{true};
+ volatile bool vacant= true;
- volatile bool finished = false;
+ std::atomic<bool> finished {false};
volatile int next = -1;
uchar* pos_write_first = nullptr;
- /* For wrapping case in curricular write buffer*/
+ /** For wrapping case in curricular write buffer */
uchar* pos_write_second = nullptr;
uchar* pos_end = nullptr;
@@ -30,17 +31,27 @@ private:
volatile size_t count_first = 0;
volatile size_t count_second = 0;
+
+ /**
+ Each time the buffer wraps, its version increases. Then it's compared
+ with slot version to avoid the race when the slot was cleared by other
+ thread and then re-occupied by a new writer (i.e. vacant == false)
+ in the same place (i.e. write_pos and lenth are the same)
+ */
+ volatile longlong wrap_version;
};
static const int count_thread_for_slots = 4;
cache_slot_t _slots[count_thread_for_slots];
- /* Semaphore for predict overflow */
- sem_t semaphore;
+ /** Last used slot */
+ volatile int last_slot= -1;
- /* Last used slot */
- int last_slot = -1;
+ volatile longlong version= 1;
+
+ /** This semaphore prevents slots overflow */
+ sem_t semaphore;
mysql_rwlock_t flush_rw_lock;
@@ -72,6 +83,8 @@ private:
/*
Maximum of the actual end of file and
the position represented by read_end.
+
+ Is protected by flush_rw_lock
*/
my_off_t _end_of_file;
@@ -112,7 +125,7 @@ private:
RingBuffer::RingBuffer(char* filename, size_t cachesize) {
_total_size = 0;
- sem_init(&semaphore, 0, count_thread_for_slots);
+ sem_init(&semaphore, 0, count_thread_for_slots-1);
mysql_rwlock_init(0, &flush_rw_lock);
_file = my_open(filename,O_CREAT | O_RDWR,MYF(MY_WME));
if (_file >= 0) {
@@ -162,6 +175,7 @@ RingBuffer::RingBuffer(char* filename, size_t cachesize) {
mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
&_buffer_lock, MY_MUTEX_INIT_FAST);
}
+
RingBuffer::~RingBuffer() {
sem_destroy(&semaphore);
mysql_rwlock_destroy(&flush_rw_lock);
@@ -178,26 +192,62 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
sem_wait(&semaphore);
int i;
mysql_mutex_lock(&_buffer_lock);
- for (i = 0; i < count_thread_for_slots; ++i) {
+ for (i= 0; i < count_thread_for_slots; ++i)
+ {
auto &vacant= _slots[i].vacant;
- if(vacant.load(std::memory_order_relaxed)
- && vacant.exchange(false, std::memory_order_acquire))
+ if (vacant)
+ {
+ vacant= false;
+ _slots[i].wrap_version= version;
break;
+ }
}
+ assert(i != count_thread_for_slots);
if(Count > (_buffer_length - _total_size)) {
+ /*
+ Buffer is full, flush to disk.
+ 1. Wait for all writes finished by wlock(flush_rw_lock)
+ 2. Re-initialize slots and increase version
+ 3. Unlock buffer_lock to allow other writers exit (or else they'll wait
+ in release() until flushing is done).
+ 4. Fill out the rest of buffer under exclusive lock and write to file.
+ */
+
+ mysql_rwlock_wrlock(&flush_rw_lock);
+
+ uchar *save_write_new_pos= _write_new_pos;
+
+ DBUG_ASSERT(_append_read_pos)
+ _write_new_pos = _write_buffer;
+ _write_pos= _write_buffer;
+ _total_size = 0;
+
+ for (int j = 0; j < count_thread_for_slots; j++) {
+ if(j == i)
+ continue;
+ if(!_slots[j].vacant)
+ sem_post(&semaphore);
+ _slots[j].finished= false;
+ _slots[j].vacant= true;
+ _slots[j].next= -1;
+ _slots[j].pos_write_first= nullptr;
+ _slots[j].pos_write_second= nullptr;
+ _slots[j].pos_end= nullptr;
+ }
+ last_slot = -1;
- if(_write_new_pos < _append_read_pos) {
- size_t rest_length = _append_read_pos - _write_new_pos;
- memcpy(_write_new_pos, From, rest_length);
+ if(save_write_new_pos < _append_read_pos) {
+ size_t rest_length = _append_read_pos - save_write_new_pos;
+ memcpy(save_write_new_pos, From, rest_length);
_total_size += rest_length;
Count -= rest_length;
From += rest_length;
- _write_pos = _write_new_pos + rest_length;
+ _write_pos = save_write_new_pos + rest_length;
}
else {
- size_t rest_length_to_right_border = _write_end - _write_new_pos;
- memcpy(_write_new_pos, From, rest_length_to_right_border);
+ size_t rest_length_to_right_border = _write_end - save_write_new_pos;
+ memcpy(save_write_new_pos, From, rest_length_to_right_border);
_total_size += rest_length_to_right_border;
Count -= rest_length_to_right_border;
From += rest_length_to_right_border;
@@ -210,8 +260,8 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
From += rest_length_to_read_border;
_write_pos = _write_buffer + rest_length_to_read_border;
}
- mysql_rwlock_wrlock(&flush_rw_lock);
_flush_io_buffer(i);
+ _append_read_pos= _write_buffer;
mysql_rwlock_unlock(&flush_rw_lock);
}
@@ -231,6 +281,7 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
rest_length_to_right_border = _write_end - _write_new_pos;
if(Count > rest_length_to_right_border) {
+ version++;
_slots[i].count_first = rest_length_to_right_border;
_slots[i].pos_write_second = _write_buffer;
_slots[i].count_second = Count - rest_length_to_right_border;
@@ -249,10 +300,13 @@ int RingBuffer::_slot_acquire(uchar *&From, size_t &Count) {
bool RingBuffer::_slot_release(int slot_id) {
+ auto version_here= _slots[slot_id].wrap_version;
_slots[slot_id].finished = true;
mysql_rwlock_unlock(&flush_rw_lock);
+ DEBUG_SYNC(nullptr, "slot_release");
mysql_mutex_lock(&_buffer_lock);
- if(last_slot != -1 && _write_pos == _slots[slot_id].pos_write_first) {
+ if (last_slot != -1 && version_here == _slots[slot_id].wrap_version
+ && _write_pos == _slots[slot_id].pos_write_first) {
do {
_write_pos = _slots[slot_id].pos_end;
@@ -312,27 +366,8 @@ int RingBuffer::_flush_io_buffer(int not_released) {
}
_end_of_file+= _total_size;
- _write_new_pos = _append_read_pos= _write_buffer;
-
DBUG_ASSERT(_end_of_file == mysql_file_tell(_file, MYF(0)));
- _write_pos= _write_buffer;
- _total_size = 0;
-
- for (int i = 0; i < count_thread_for_slots; i++) {
- if(i == not_released)
- continue;
- if(!_slots[i].vacant)
- sem_post(&semaphore);
- _slots[i].finished= false;
- _slots[i].vacant= true;
- _slots[i].next= -1;
- _slots[i].pos_write_first= nullptr;
- _slots[i].pos_write_second= nullptr;
- _slots[i].pos_end= nullptr;
- }
- last_slot = -1;
-
return _error;
}