summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergei Petrunia <psergey@askmonty.org>2016-09-23 14:18:29 +0300
committerVicențiu Ciorbaru <vicentiu@mariadb.org>2016-09-24 15:12:34 +0200
commit047963922c0c89c76f82cd14eb05ec56c19a91e9 (patch)
treeb16d867934146b3c38961830f4e43ca08d890982
parent6e4015727a60c5b98bdc9c4590adc687dacc4876 (diff)
downloadmariadb-git-047963922c0c89c76f82cd14eb05ec56c19a91e9.tar.gz
MDEV-9736: Window functions: multiple cursors to read filesort result
Add support for having multiple IO_CACHEs with type=READ_CACHE to share the file they are reading from. Each IO_CACHE keeps its own in-memory buffer. When doing a read or seek operation on the file, it notifies other IO_CACHEs that the file position has been changed. Make Rowid_seq_cursor use cloned IO_CACHE when reading filesort result.
-rw-r--r--include/my_sys.h7
-rw-r--r--mysql-test/r/win_big.result93
-rw-r--r--mysql-test/t/win_big.test104
-rw-r--r--mysys/mf_iocache.c148
-rw-r--r--sql/sql_window.cc152
5 files changed, 447 insertions, 57 deletions
diff --git a/include/my_sys.h b/include/my_sys.h
index 25554701a8c..528950f4e22 100644
--- a/include/my_sys.h
+++ b/include/my_sys.h
@@ -472,6 +472,8 @@ typedef struct st_io_cache /* Used when cacheing files */
const char *dir;
char prefix[3];
File file; /* file descriptor */
+
+ struct st_io_cache *next_file_user;
/*
seek_not_done is set by my_b_seek() to inform the upcoming read/write
operation that a seek needs to be preformed prior to the actual I/O
@@ -802,6 +804,11 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
extern void setup_io_cache(IO_CACHE* info);
extern void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
IO_CACHE *write_cache, uint num_threads);
+
+extern int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave);
+void end_slave_io_cache(IO_CACHE *cache);
+void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset);
+
extern void remove_io_thread(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,uchar *Buffer,size_t Count);
extern int my_b_append(IO_CACHE *info,const uchar *Buffer,size_t Count);
diff --git a/mysql-test/r/win_big.result b/mysql-test/r/win_big.result
new file mode 100644
index 00000000000..7ea044e702c
--- /dev/null
+++ b/mysql-test/r/win_big.result
@@ -0,0 +1,93 @@
+create table t0 (a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+create table t1(a int);
+insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C;
+create table t10 (a int, b int, c int);
+insert into t10
+select
+A.a + 1000*B.a,
+A.a + 1000*B.a,
+A.a + 1000*B.a
+from t1 A, t0 B
+order by A.a+1000*B.a;
+#################################################################
+## Try a basic example
+flush status;
+create table t21 as
+select
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name Value
+Sort_merge_passes 0
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name Value
+Sort_merge_passes 35
+include/diff_tables.inc [t21, t22]
+drop table t21, t22;
+#################################################################
+# Try many cursors
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name Value
+Sort_merge_passes 0
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name Value
+Sort_merge_passes 35
+include/diff_tables.inc [t21, t22]
+drop table t21, t22;
+#################################################################
+# Try having cursors pointing at different IO_CACHE pages
+# in the IO_CACHE
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select
+a,
+sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name Value
+Sort_merge_passes 0
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+a,
+sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name Value
+Sort_merge_passes 35
+include/diff_tables.inc [t21, t22]
+drop table t21, t22;
+#################################################################
+drop table t10;
+drop table t0,t1;
diff --git a/mysql-test/t/win_big.test b/mysql-test/t/win_big.test
new file mode 100644
index 00000000000..a0398126eb3
--- /dev/null
+++ b/mysql-test/t/win_big.test
@@ -0,0 +1,104 @@
+#
+# Tests for window functions over big datasets.
+# "Big" here is "big enough so that filesort result doesn't fit in a
+# memory buffer".
+#
+#
+
+create table t0 (a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+
+create table t1(a int);
+insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C;
+
+create table t10 (a int, b int, c int);
+insert into t10
+select
+ A.a + 1000*B.a,
+ A.a + 1000*B.a,
+ A.a + 1000*B.a
+from t1 A, t0 B
+order by A.a+1000*B.a;
+
+--echo #################################################################
+--echo ## Try a basic example
+flush status;
+create table t21 as
+select
+ sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+ t10;
+show status like 'Sort_merge_passes';
+
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+ sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+ t10;
+show status like 'Sort_merge_passes';
+
+let $diff_tables= t21, t22;
+source include/diff_tables.inc;
+drop table t21, t22;
+
+--echo #################################################################
+--echo # Try many cursors
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select
+ sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+ sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+ sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+ t10;
+show status like 'Sort_merge_passes';
+
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+ sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+ sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+ sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+ t10;
+show status like 'Sort_merge_passes';
+
+let $diff_tables= t21, t22;
+source include/diff_tables.inc;
+drop table t21, t22;
+
+--echo #################################################################
+--echo # Try having cursors pointing at different IO_CACHE pages
+--echo # in the IO_CACHE
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select
+ a,
+ sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+ t10;
+show status like 'Sort_merge_passes';
+
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+ a,
+ sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+ t10;
+show status like 'Sort_merge_passes';
+
+let $diff_tables= t21, t22;
+source include/diff_tables.inc;
+drop table t21, t22;
+--echo #################################################################
+
+drop table t10;
+drop table t0,t1;
+
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index 635e544d367..77581a51d75 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -193,6 +193,7 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
info->alloced_buffer = 0;
info->buffer=0;
info->seek_not_done= 0;
+ info->next_file_user= NULL;
if (file >= 0)
{
@@ -328,6 +329,101 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
DBUG_RETURN(0);
} /* init_io_cache */
+
+
+/*
+ Initialize the slave IO_CACHE to read the same file (and data)
+ as master does.
+
+ One can create multiple slaves from a single master. Every slave and master
+ will have independent file positions.
+
+ The master must be a non-shared READ_CACHE.
+ It is assumed that no more reads are done after a master and/or a slave
+ has been freed (this limitation can be easily lifted).
+*/
+
+int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave)
+{
+ uchar *slave_buf;
+ DBUG_ASSERT(master->type == READ_CACHE);
+ DBUG_ASSERT(!master->share);
+ DBUG_ASSERT(master->alloced_buffer);
+
+ if (!(slave_buf= (uchar*)my_malloc(master->buffer_length, MYF(0))))
+ {
+ return 1;
+ }
+ memcpy(slave, master, sizeof(IO_CACHE));
+ slave->buffer= slave_buf;
+
+ memcpy(slave->buffer, master->buffer, master->buffer_length);
+ slave->read_pos= slave->buffer + (master->read_pos - master->buffer);
+ slave->read_end= slave->buffer + (master->read_end - master->buffer);
+
+ DBUG_ASSERT(master->current_pos == &master->read_pos);
+ slave->current_pos= &slave->read_pos;
+ DBUG_ASSERT(master->current_end == &master->read_end);
+ slave->current_end= &slave->read_end;
+
+ if (master->next_file_user)
+ {
+ IO_CACHE *p;
+ for (p= master->next_file_user;
+ p->next_file_user !=master;
+ p= p->next_file_user)
+ {}
+
+ p->next_file_user= slave;
+ slave->next_file_user= master;
+ }
+ else
+ {
+ slave->next_file_user= master;
+ master->next_file_user= slave;
+ }
+ return 0;
+}
+
+
+void end_slave_io_cache(IO_CACHE *cache)
+{
+ my_free(cache->buffer);
+}
+
+/*
+ Seek a read io cache to a given offset
+*/
+void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset)
+{
+ my_off_t cached_data_start= cache->pos_in_file;
+ my_off_t cached_data_end= cache->pos_in_file + (cache->read_pos -
+ cache->buffer);
+ if (needed_offset >= cached_data_start &&
+ needed_offset < cached_data_end)
+ {
+ /*
+ The offset we're seeking to is in the buffer.
+ Move buffer's read position accordingly
+ */
+ cache->read_pos= cache->buffer + (needed_offset - cached_data_start);
+ }
+ else
+ {
+ if (needed_offset > cache->end_of_file)
+ needed_offset= cache->end_of_file;
+ /*
+ The offset we're seeking to is not in the buffer.
+ - Set the buffer to be exhausted.
+ - Make the next read to a mysql_file_seek() call to the required
+ offset (but still use aligned reads).
+ */
+ cache->read_pos= cache->read_end;
+ cache->seek_not_done= 1;
+ cache->pos_in_file= (needed_offset / IO_SIZE) * IO_SIZE;
+ }
+}
+
/* Wait until current request is ready */
#ifdef HAVE_AIOWAIT
@@ -583,6 +679,17 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
{
/* No error, reset seek_not_done flag. */
info->seek_not_done= 0;
+
+ if (info->next_file_user)
+ {
+ IO_CACHE *c;
+ for (c= info->next_file_user;
+ c!= info;
+ c= c->next_file_user)
+ {
+ c->seek_not_done= 1;
+ }
+ }
}
else
{
@@ -671,22 +778,35 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
DBUG_RETURN(0); /* EOF */
}
}
- else if ((length= mysql_file_read(info->file,info->buffer, max_length,
+ else
+ {
+ if (info->next_file_user)
+ {
+ IO_CACHE *c;
+ for (c= info->next_file_user;
+ c!= info;
+ c= c->next_file_user)
+ {
+ c->seek_not_done= 1;
+ }
+ }
+ if ((length= mysql_file_read(info->file,info->buffer, max_length,
info->myflags)) < Count ||
length == (size_t) -1)
- {
- /*
- We got an read error, or less than requested (end of file).
- If not a read error, copy, what we got.
- */
- if (length != (size_t) -1)
- memcpy(Buffer, info->buffer, length);
- info->pos_in_file= pos_in_file;
- /* For a read error, return -1, otherwise, what we got in total. */
- info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
- info->read_pos=info->read_end=info->buffer;
- info->seek_not_done=1;
- DBUG_RETURN(1);
+ {
+ /*
+ We got an read error, or less than requested (end of file).
+ If not a read error, copy, what we got.
+ */
+ if (length != (size_t) -1)
+ memcpy(Buffer, info->buffer, length);
+ info->pos_in_file= pos_in_file;
+ /* For a read error, return -1, otherwise, what we got in total. */
+ info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
+ info->read_pos=info->read_end=info->buffer;
+ info->seek_not_done=1;
+ DBUG_RETURN(1);
+ }
}
/*
Count is the remaining number of bytes requested.
diff --git a/sql/sql_window.cc b/sql/sql_window.cc
index a2fada64a83..c40ffeb5fb1 100644
--- a/sql/sql_window.cc
+++ b/sql/sql_window.cc
@@ -515,17 +515,6 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
// note: make rr_from_pointers static again when not need it here anymore
int rr_from_pointers(READ_RECORD *info);
-/*
- A temporary way to clone READ_RECORD structures until Monty provides the real
- one.
-*/
-bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
-{
- //DBUG_ASSERT(src->table->sort.record_pointers);
- DBUG_ASSERT(src->read_record == rr_from_pointers);
- memcpy(dst, src, sizeof(READ_RECORD));
- return false;
-}
/////////////////////////////////////////////////////////////////////////////
@@ -540,68 +529,145 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
class Rowid_seq_cursor
{
public:
- virtual ~Rowid_seq_cursor() {}
+ Rowid_seq_cursor() : io_cache(NULL), ref_buffer(0) {}
+ virtual ~Rowid_seq_cursor()
+ {
+ if (ref_buffer)
+ my_free(ref_buffer);
+ if (io_cache)
+ {
+ end_slave_io_cache(io_cache);
+ my_free(io_cache);
+ io_cache= NULL;
+ }
+ }
+
+private:
+ /* Length of one rowid element */
+ size_t ref_length;
+
+ /* If io_cache=!NULL, use it */
+ IO_CACHE *io_cache;
+ uchar *ref_buffer; /* Buffer for the last returned rowid */
+ uint rownum; /* Number of the rowid that is about to be returned */
+ bool cache_eof; /* whether we've reached EOF */
+
+ /* The following are used when we are reading from an array of pointers */
+ uchar *cache_start;
+ uchar *cache_pos;
+ uchar *cache_end;
+public:
void init(READ_RECORD *info)
{
- cache_start= info->cache_pos;
- cache_pos= info->cache_pos;
- cache_end= info->cache_end;
ref_length= info->ref_length;
+ if (info->read_record == rr_from_pointers)
+ {
+ io_cache= NULL;
+ cache_start= info->cache_pos;
+ cache_pos= info->cache_pos;
+ cache_end= info->cache_end;
+ }
+ else
+ {
+ //DBUG_ASSERT(info->read_record == rr_from_tempfile);
+ rownum= 0;
+ cache_eof= false;
+ io_cache= (IO_CACHE*)my_malloc(sizeof(IO_CACHE), MYF(0));
+ init_slave_io_cache(info->io_cache, io_cache);
+
+ ref_buffer= (uchar*)my_malloc(ref_length, MYF(0));
+ }
}
virtual int next()
{
- /* Allow multiple next() calls in EOF state. */
- if (cache_pos == cache_end)
- return -1;
-
- cache_pos+= ref_length;
- DBUG_ASSERT(cache_pos <= cache_end);
+ if (io_cache)
+ {
+ if (cache_eof)
+ return 1;
+ if (my_b_read(io_cache,ref_buffer,ref_length))
+ {
+ cache_eof= 1; // TODO: remove cache_eof
+ return -1;
+ }
+ rownum++;
+ return 0;
+ }
+ else
+ {
+ /* Allow multiple next() calls in EOF state. */
+ if (cache_pos == cache_end)
+ return -1;
+ cache_pos+= ref_length;
+ DBUG_ASSERT(cache_pos <= cache_end);
+ }
return 0;
}
virtual int prev()
{
- /* Allow multiple prev() calls when positioned at the start. */
- if (cache_pos == cache_start)
- return -1;
- cache_pos-= ref_length;
- DBUG_ASSERT(cache_pos >= cache_start);
+ if (io_cache)
+ {
+ if (rownum == 0)
+ return -1;
- return 0;
+ move_to(rownum - 1);
+ return 0;
+ }
+ else
+ {
+ /* Allow multiple prev() calls when positioned at the start. */
+ if (cache_pos == cache_start)
+ return -1;
+ cache_pos-= ref_length;
+ DBUG_ASSERT(cache_pos >= cache_start);
+ return 0;
+ }
}
ha_rows get_rownum() const
{
- return (cache_pos - cache_start) / ref_length;
+ if (io_cache)
+ return rownum;
+ else
+ return (cache_pos - cache_start) / ref_length;
}
void move_to(ha_rows row_number)
{
- cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
- DBUG_ASSERT(cache_pos <= cache_end);
+ if (io_cache)
+ {
+ seek_io_cache(io_cache, row_number * ref_length);
+ rownum= row_number;
+ next();
+ }
+ else
+ {
+ cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
+ DBUG_ASSERT(cache_pos <= cache_end);
+ }
}
protected:
- bool at_eof() { return (cache_pos == cache_end); }
-
- uchar *get_prev_rowid()
+ bool at_eof()
{
- if (cache_pos == cache_start)
- return NULL;
+ if (io_cache)
+ {
+ return cache_eof;
+ }
else
- return cache_pos - ref_length;
+ return (cache_pos == cache_end);
}
- uchar *get_curr_rowid() { return cache_pos; }
-
-private:
- uchar *cache_start;
- uchar *cache_pos;
- uchar *cache_end;
- uint ref_length;
+ uchar *get_curr_rowid()
+ {
+ if (io_cache)
+ return ref_buffer;
+ else
+ return cache_pos;
+ }
};