diff options
author | Sergei Petrunia <psergey@askmonty.org> | 2016-09-23 14:18:29 +0300 |
---|---|---|
committer | Vicențiu Ciorbaru <vicentiu@mariadb.org> | 2016-09-24 15:12:34 +0200 |
commit | 047963922c0c89c76f82cd14eb05ec56c19a91e9 (patch) | |
tree | b16d867934146b3c38961830f4e43ca08d890982 | |
parent | 6e4015727a60c5b98bdc9c4590adc687dacc4876 (diff) | |
download | mariadb-git-047963922c0c89c76f82cd14eb05ec56c19a91e9.tar.gz |
MDEV-9736: Window functions: multiple cursors to read filesort result
Add support for having multiple IO_CACHEs with type=READ_CACHE to share
the file they are reading from.
Each IO_CACHE keeps its own in-memory buffer. When doing a read or seek
operation on the file, it notifies other IO_CACHEs that the file position
has been changed.
Make Rowid_seq_cursor use cloned IO_CACHE when reading filesort result.
-rw-r--r-- | include/my_sys.h | 7 | ||||
-rw-r--r-- | mysql-test/r/win_big.result | 93 | ||||
-rw-r--r-- | mysql-test/t/win_big.test | 104 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 148 | ||||
-rw-r--r-- | sql/sql_window.cc | 152 |
5 files changed, 447 insertions, 57 deletions
diff --git a/include/my_sys.h b/include/my_sys.h index 25554701a8c..528950f4e22 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -472,6 +472,8 @@ typedef struct st_io_cache /* Used when cacheing files */ const char *dir; char prefix[3]; File file; /* file descriptor */ + + struct st_io_cache *next_file_user; /* seek_not_done is set by my_b_seek() to inform the upcoming read/write operation that a seek needs to be preformed prior to the actual I/O @@ -802,6 +804,11 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, extern void setup_io_cache(IO_CACHE* info); extern void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare, IO_CACHE *write_cache, uint num_threads); + +extern int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave); +void end_slave_io_cache(IO_CACHE *cache); +void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset); + extern void remove_io_thread(IO_CACHE *info); extern int _my_b_async_read(IO_CACHE *info,uchar *Buffer,size_t Count); extern int my_b_append(IO_CACHE *info,const uchar *Buffer,size_t Count); diff --git a/mysql-test/r/win_big.result b/mysql-test/r/win_big.result new file mode 100644 index 00000000000..7ea044e702c --- /dev/null +++ b/mysql-test/r/win_big.result @@ -0,0 +1,93 @@ +create table t0 (a int); +insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); +create table t1(a int); +insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C; +create table t10 (a int, b int, c int); +insert into t10 +select +A.a + 1000*B.a, +A.a + 1000*B.a, +A.a + 1000*B.a +from t1 A, t0 B +order by A.a+1000*B.a; +################################################################# +## Try a basic example +flush status; +create table t21 as +select +sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B +from +t10; +show status like 'Sort_merge_passes'; +Variable_name Value +Sort_merge_passes 0 +set sort_buffer_size=1024; +flush status; +create table t22 as +select +sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B +from +t10; +show status like 'Sort_merge_passes'; +Variable_name Value +Sort_merge_passes 35 +include/diff_tables.inc [t21, t22] +drop table t21, t22; +################################################################# +# Try many cursors +set sort_buffer_size=default; +flush status; +create table t21 as +select +sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1, +sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2, +sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3 +from +t10; +show status like 'Sort_merge_passes'; +Variable_name Value +Sort_merge_passes 0 +set sort_buffer_size=1024; +flush status; +create table t22 as +select +sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1, +sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2, +sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3 +from +t10; +show status like 'Sort_merge_passes'; +Variable_name Value +Sort_merge_passes 35 +include/diff_tables.inc [t21, t22] +drop table t21, t22; +################################################################# +# Try having cursors pointing at different IO_CACHE pages +# in the IO_CACHE +set sort_buffer_size=default; +flush status; +create table t21 as +select +a, +sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1 +from +t10; +show status like 'Sort_merge_passes'; +Variable_name Value +Sort_merge_passes 0 +set sort_buffer_size=1024; +flush status; +create table t22 as +select +a, +sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1 +from +t10; +show status like 'Sort_merge_passes'; +Variable_name Value +Sort_merge_passes 35 +include/diff_tables.inc [t21, t22] +drop table t21, t22; +################################################################# +drop table t10; +drop table t0,t1; diff --git a/mysql-test/t/win_big.test b/mysql-test/t/win_big.test new file mode 100644 index 00000000000..a0398126eb3 --- /dev/null +++ b/mysql-test/t/win_big.test @@ -0,0 +1,104 @@ +# +# Tests for window functions over big datasets. +# "Big" here is "big enough so that filesort result doesn't fit in a +# memory buffer". +# +# + +create table t0 (a int); +insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); + +create table t1(a int); +insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C; + +create table t10 (a int, b int, c int); +insert into t10 +select + A.a + 1000*B.a, + A.a + 1000*B.a, + A.a + 1000*B.a +from t1 A, t0 B +order by A.a+1000*B.a; + +--echo ################################################################# +--echo ## Try a basic example +flush status; +create table t21 as +select + sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B +from + t10; +show status like 'Sort_merge_passes'; + +set sort_buffer_size=1024; +flush status; +create table t22 as +select + sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B +from + t10; +show status like 'Sort_merge_passes'; + +let $diff_tables= t21, t22; +source include/diff_tables.inc; +drop table t21, t22; + +--echo ################################################################# +--echo # Try many cursors +set sort_buffer_size=default; +flush status; +create table t21 as +select + sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1, + sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2, + sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3 +from + t10; +show status like 'Sort_merge_passes'; + +set sort_buffer_size=1024; +flush status; +create table t22 as +select + sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1, + sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2, + sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3 +from + t10; +show status like 'Sort_merge_passes'; + +let $diff_tables= t21, t22; +source include/diff_tables.inc; +drop table t21, t22; + +--echo ################################################################# +--echo # Try having cursors pointing at different IO_CACHE pages +--echo # in the IO_CACHE +set sort_buffer_size=default; +flush status; +create table t21 as +select + a, + sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1 +from + t10; +show status like 'Sort_merge_passes'; + +set sort_buffer_size=1024; +flush status; +create table t22 as +select + a, + sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1 +from + t10; +show status like 'Sort_merge_passes'; + +let $diff_tables= t21, t22; +source include/diff_tables.inc; +drop table t21, t22; +--echo ################################################################# + +drop table t10; +drop table t0,t1; + diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 635e544d367..77581a51d75 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -193,6 +193,7 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize, info->alloced_buffer = 0; info->buffer=0; info->seek_not_done= 0; + info->next_file_user= NULL; if (file >= 0) { @@ -328,6 +329,101 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize, DBUG_RETURN(0); } /* init_io_cache */ + + +/* + Initialize the slave IO_CACHE to read the same file (and data) + as master does. + + One can create multiple slaves from a single master. Every slave and master + will have independent file positions. + + The master must be a non-shared READ_CACHE. + It is assumed that no more reads are done after a master and/or a slave + has been freed (this limitation can be easily lifted). +*/ + +int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave) +{ + uchar *slave_buf; + DBUG_ASSERT(master->type == READ_CACHE); + DBUG_ASSERT(!master->share); + DBUG_ASSERT(master->alloced_buffer); + + if (!(slave_buf= (uchar*)my_malloc(master->buffer_length, MYF(0)))) + { + return 1; + } + memcpy(slave, master, sizeof(IO_CACHE)); + slave->buffer= slave_buf; + + memcpy(slave->buffer, master->buffer, master->buffer_length); + slave->read_pos= slave->buffer + (master->read_pos - master->buffer); + slave->read_end= slave->buffer + (master->read_end - master->buffer); + + DBUG_ASSERT(master->current_pos == &master->read_pos); + slave->current_pos= &slave->read_pos; + DBUG_ASSERT(master->current_end == &master->read_end); + slave->current_end= &slave->read_end; + + if (master->next_file_user) + { + IO_CACHE *p; + for (p= master->next_file_user; + p->next_file_user !=master; + p= p->next_file_user) + {} + + p->next_file_user= slave; + slave->next_file_user= master; + } + else + { + slave->next_file_user= master; + master->next_file_user= slave; + } + return 0; +} + + +void end_slave_io_cache(IO_CACHE *cache) +{ + my_free(cache->buffer); +} + +/* + Seek a read io cache to a given offset +*/ +void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset) +{ + my_off_t cached_data_start= cache->pos_in_file; + my_off_t cached_data_end= cache->pos_in_file + (cache->read_pos - + cache->buffer); + if (needed_offset >= cached_data_start && + needed_offset < cached_data_end) + { + /* + The offset we're seeking to is in the buffer. + Move buffer's read position accordingly + */ + cache->read_pos= cache->buffer + (needed_offset - cached_data_start); + } + else + { + if (needed_offset > cache->end_of_file) + needed_offset= cache->end_of_file; + /* + The offset we're seeking to is not in the buffer. + - Set the buffer to be exhausted. + - Make the next read to a mysql_file_seek() call to the required + offset (but still use aligned reads). + */ + cache->read_pos= cache->read_end; + cache->seek_not_done= 1; + cache->pos_in_file= (needed_offset / IO_SIZE) * IO_SIZE; + } +} + /* Wait until current request is ready */ #ifdef HAVE_AIOWAIT @@ -583,6 +679,17 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count) { /* No error, reset seek_not_done flag. */ info->seek_not_done= 0; + + if (info->next_file_user) + { + IO_CACHE *c; + for (c= info->next_file_user; + c!= info; + c= c->next_file_user) + { + c->seek_not_done= 1; + } + } } else { @@ -671,22 +778,35 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count) DBUG_RETURN(0); /* EOF */ } } - else if ((length= mysql_file_read(info->file,info->buffer, max_length, + else + { + if (info->next_file_user) + { + IO_CACHE *c; + for (c= info->next_file_user; + c!= info; + c= c->next_file_user) + { + c->seek_not_done= 1; + } + } + if ((length= mysql_file_read(info->file,info->buffer, max_length, info->myflags)) < Count || length == (size_t) -1) - { - /* - We got an read error, or less than requested (end of file). - If not a read error, copy, what we got. - */ - if (length != (size_t) -1) - memcpy(Buffer, info->buffer, length); - info->pos_in_file= pos_in_file; - /* For a read error, return -1, otherwise, what we got in total. */ - info->error= length == (size_t) -1 ? -1 : (int) (length+left_length); - info->read_pos=info->read_end=info->buffer; - info->seek_not_done=1; - DBUG_RETURN(1); + { + /* + We got an read error, or less than requested (end of file). + If not a read error, copy, what we got. + */ + if (length != (size_t) -1) + memcpy(Buffer, info->buffer, length); + info->pos_in_file= pos_in_file; + /* For a read error, return -1, otherwise, what we got in total. */ + info->error= length == (size_t) -1 ? -1 : (int) (length+left_length); + info->read_pos=info->read_end=info->buffer; + info->seek_not_done=1; + DBUG_RETURN(1); + } } /* Count is the remaining number of bytes requested. diff --git a/sql/sql_window.cc b/sql/sql_window.cc index a2fada64a83..c40ffeb5fb1 100644 --- a/sql/sql_window.cc +++ b/sql/sql_window.cc @@ -515,17 +515,6 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list) // note: make rr_from_pointers static again when not need it here anymore int rr_from_pointers(READ_RECORD *info); -/* - A temporary way to clone READ_RECORD structures until Monty provides the real - one. -*/ -bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst) -{ - //DBUG_ASSERT(src->table->sort.record_pointers); - DBUG_ASSERT(src->read_record == rr_from_pointers); - memcpy(dst, src, sizeof(READ_RECORD)); - return false; -} ///////////////////////////////////////////////////////////////////////////// @@ -540,68 +529,145 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst) class Rowid_seq_cursor { public: - virtual ~Rowid_seq_cursor() {} + Rowid_seq_cursor() : io_cache(NULL), ref_buffer(0) {} + virtual ~Rowid_seq_cursor() + { + if (ref_buffer) + my_free(ref_buffer); + if (io_cache) + { + end_slave_io_cache(io_cache); + my_free(io_cache); + io_cache= NULL; + } + } + +private: + /* Length of one rowid element */ + size_t ref_length; + + /* If io_cache=!NULL, use it */ + IO_CACHE *io_cache; + uchar *ref_buffer; /* Buffer for the last returned rowid */ + uint rownum; /* Number of the rowid that is about to be returned */ + bool cache_eof; /* whether we've reached EOF */ + + /* The following are used when we are reading from an array of pointers */ + uchar *cache_start; + uchar *cache_pos; + uchar *cache_end; +public: void init(READ_RECORD *info) { - cache_start= info->cache_pos; - cache_pos= info->cache_pos; - cache_end= info->cache_end; ref_length= info->ref_length; + if (info->read_record == rr_from_pointers) + { + io_cache= NULL; + cache_start= info->cache_pos; + cache_pos= info->cache_pos; + cache_end= info->cache_end; + } + else + { + //DBUG_ASSERT(info->read_record == rr_from_tempfile); + rownum= 0; + cache_eof= false; + io_cache= (IO_CACHE*)my_malloc(sizeof(IO_CACHE), MYF(0)); + init_slave_io_cache(info->io_cache, io_cache); + + ref_buffer= (uchar*)my_malloc(ref_length, MYF(0)); + } } virtual int next() { - /* Allow multiple next() calls in EOF state. */ - if (cache_pos == cache_end) - return -1; - - cache_pos+= ref_length; - DBUG_ASSERT(cache_pos <= cache_end); + if (io_cache) + { + if (cache_eof) + return 1; + if (my_b_read(io_cache,ref_buffer,ref_length)) + { + cache_eof= 1; // TODO: remove cache_eof + return -1; + } + rownum++; + return 0; + } + else + { + /* Allow multiple next() calls in EOF state. */ + if (cache_pos == cache_end) + return -1; + cache_pos+= ref_length; + DBUG_ASSERT(cache_pos <= cache_end); + } return 0; } virtual int prev() { - /* Allow multiple prev() calls when positioned at the start. */ - if (cache_pos == cache_start) - return -1; - cache_pos-= ref_length; - DBUG_ASSERT(cache_pos >= cache_start); + if (io_cache) + { + if (rownum == 0) + return -1; - return 0; + move_to(rownum - 1); + return 0; + } + else + { + /* Allow multiple prev() calls when positioned at the start. */ + if (cache_pos == cache_start) + return -1; + cache_pos-= ref_length; + DBUG_ASSERT(cache_pos >= cache_start); + return 0; + } } ha_rows get_rownum() const { - return (cache_pos - cache_start) / ref_length; + if (io_cache) + return rownum; + else + return (cache_pos - cache_start) / ref_length; } void move_to(ha_rows row_number) { - cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length); - DBUG_ASSERT(cache_pos <= cache_end); + if (io_cache) + { + seek_io_cache(io_cache, row_number * ref_length); + rownum= row_number; + next(); + } + else + { + cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length); + DBUG_ASSERT(cache_pos <= cache_end); + } } protected: - bool at_eof() { return (cache_pos == cache_end); } - - uchar *get_prev_rowid() + bool at_eof() { - if (cache_pos == cache_start) - return NULL; + if (io_cache) + { + return cache_eof; + } else - return cache_pos - ref_length; + return (cache_pos == cache_end); } - uchar *get_curr_rowid() { return cache_pos; } - -private: - uchar *cache_start; - uchar *cache_pos; - uchar *cache_end; - uint ref_length; + uchar *get_curr_rowid() + { + if (io_cache) + return ref_buffer; + else + return cache_pos; + } }; |