summaryrefslogtreecommitdiff
path: root/sql/sql_window.cc
diff options
context:
space:
mode:
authorVicențiu Ciorbaru <vicentiu@mariadb.org>2016-09-09 14:46:47 +0300
committerVicențiu Ciorbaru <vicentiu@mariadb.org>2016-09-09 18:33:29 +0300
commitdfd3be928d98d123e86b8ad3ae307e27153ab11c (patch)
tree212b3b4f7ab92966bc6df662e88e5281dce9c0b5 /sql/sql_window.cc
parentffed20c5635a58bc1465efd5d936650b1672da74 (diff)
downloadmariadb-git-dfd3be928d98d123e86b8ad3ae307e27153ab11c.tar.gz
Make cursor implementation uniform
Cursors now report their current row number as the boundary of the partition. This is used by Frame_scan_cursor to compute aggregate functions that do not support removal.
Diffstat (limited to 'sql/sql_window.cc')
-rw-r--r--sql/sql_window.cc505
1 files changed, 365 insertions, 140 deletions
diff --git a/sql/sql_window.cc b/sql/sql_window.cc
index a2e793f9582..7192c8a492e 100644
--- a/sql/sql_window.cc
+++ b/sql/sql_window.cc
@@ -4,6 +4,7 @@
#include "filesort.h"
#include "sql_base.h"
#include "sql_window.h"
+#include "my_dbug.h"
bool
@@ -549,29 +550,44 @@ public:
ref_length= info->ref_length;
}
- virtual int get_next()
+ virtual int next()
{
- /* Allow multiple get_next() calls in EOF state*/
+ /* Allow multiple next() calls in EOF state. */
if (cache_pos == cache_end)
return -1;
+
cache_pos+= ref_length;
+ DBUG_ASSERT(cache_pos <= cache_end);
+
+ return 0;
+ }
+
+ virtual int prev()
+ {
+ /* Allow multiple prev() calls when positioned at the start. */
+ if (cache_pos == cache_start)
+ return -1;
+ cache_pos-= ref_length;
+ DBUG_ASSERT(cache_pos >= cache_start);
+
return 0;
}
- ha_rows get_rownum()
+ ha_rows get_rownum() const
{
return (cache_pos - cache_start) / ref_length;
}
void move_to(ha_rows row_number)
{
- cache_pos= cache_start + row_number * ref_length;
+ cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
+ DBUG_ASSERT(cache_pos <= cache_end);
}
protected:
bool at_eof() { return (cache_pos == cache_end); }
- uchar *get_last_rowid()
+ uchar *get_prev_rowid()
{
if (cache_pos == cache_start)
return NULL;
@@ -601,30 +617,25 @@ public:
void init(READ_RECORD *info)
{
Rowid_seq_cursor::init(info);
- read_record= info;
+ table= info->table;
+ record= info->record;
}
- virtual int get_next()
+ virtual int fetch()
{
if (at_eof())
return -1;
uchar* curr_rowid= get_curr_rowid();
- int res= Rowid_seq_cursor::get_next();
- if (!res)
- {
- res= read_record->table->file->ha_rnd_pos(read_record->record,
- curr_rowid);
- }
- return res;
+ return table->file->ha_rnd_pos(record, curr_rowid);
}
- bool restore_last_row()
+ bool fetch_prev_row()
{
uchar *p;
- if ((p= get_last_rowid()))
+ if ((p= get_prev_rowid()))
{
- int rc= read_record->table->file->ha_rnd_pos(read_record->record, p);
+ int rc= table->file->ha_rnd_pos(record, p);
if (!rc)
return true; // restored ok
}
@@ -632,11 +643,10 @@ public:
}
private:
- /*
- Note: we don't own *read_record, somebody else is using it.
- We only look at the constant part of it, e.g. table, record buffer, etc.
- */
- READ_RECORD *read_record;
+ /* The table that is acccesed by this cursor. */
+ TABLE *table;
+ /* Buffer where to store the table's record data. */
+ uchar *record;
// TODO(spetrunia): should move_to() also read row here?
};
@@ -645,6 +655,8 @@ private:
/*
A cursor which only moves within a partition. The scan stops at the partition
end, and it needs an explicit command to move to the next partition.
+
+ This cursor can not move backwards.
*/
class Partition_read_cursor : public Table_read_cursor
@@ -669,39 +681,36 @@ public:
void on_next_partition(ha_rows rownum)
{
/* Remember the sort key value from the new partition */
+ move_to(rownum);
bound_tracker.check_if_next_group();
end_of_partition= false;
- }
- /*
- Moves to a new row. The row is assumed to be within the current partition.
- */
- void move_to(ha_rows rownum) { Table_read_cursor::move_to(rownum); }
+ }
/*
This returns -1 when end of partition was reached.
*/
- int get_next()
+ int next()
{
int res;
if (end_of_partition)
return -1;
- if ((res= Table_read_cursor::get_next()))
+
+ if ((res= Table_read_cursor::next()) ||
+ (res= fetch()))
return res;
if (bound_tracker.compare_with_cache())
{
+ /* This row is part of a new partition, don't move
+ forward any more untill we get informed of a new partition. */
+ Table_read_cursor::prev();
end_of_partition= true;
return -1;
}
return 0;
}
- bool restore_last_row()
- {
- return Table_read_cursor::restore_last_row();
- }
-
private:
Group_bound_tracker bound_tracker;
bool end_of_partition;
@@ -783,6 +792,8 @@ public:
virtual void pre_next_row() {};
virtual void next_row()=0;
+ virtual bool is_outside_computation_bounds() const { return false; };
+
virtual ~Frame_cursor() {}
/*
@@ -796,7 +807,7 @@ public:
}
/* Retrieves the row number that this cursor currently points at. */
- virtual ha_rows get_curr_rownum()= 0;
+ virtual ha_rows get_curr_rownum() const= 0;
protected:
inline void add_value_to_items()
@@ -962,7 +973,6 @@ public:
void next_partition(ha_rows rownum)
{
- cursor.move_to(rownum);
walk_till_non_peer();
}
@@ -982,26 +992,39 @@ public:
(prev_row + n) >= R
We need to check about the current row.
*/
- if (cursor.restore_last_row())
- {
- if (order_direction * range_expr->cmp_read_only() <= 0)
- return;
- remove_value_from_items();
- }
walk_till_non_peer();
}
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
+ bool is_outside_computation_bounds() const
+ {
+ if (end_of_partition)
+ return true;
+ return false;
+ }
+
private:
void walk_till_non_peer()
{
+ if (cursor.fetch()) // ERROR
+ return;
+ // Current row is not a peer.
+ if (order_direction * range_expr->cmp_read_only() <= 0)
+ return;
+ remove_value_from_items();
+
int res;
- while (!(res= cursor.get_next()))
+ while (!(res= cursor.next()))
{
+ /* Note, no need to fetch the value explicitly here. The partition
+ read cursor will fetch it to check if the partition has changed.
+ TODO(cvicentiu) make this piece of information not necessary by
+ reimplementing Partition_read_cursor.
+ */
if (order_direction * range_expr->cmp_read_only() <= 0)
break;
remove_value_from_items();
@@ -1009,6 +1032,7 @@ private:
if (res)
end_of_partition= true;
}
+
};
@@ -1049,7 +1073,7 @@ public:
SQL_I_List<ORDER> *order_list,
bool is_preceding_arg, Item *n_val_arg) :
cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
- is_preceding(is_preceding_arg)
+ is_preceding(is_preceding_arg), added_values(false)
{
DBUG_ASSERT(order_list->elements == 1);
Item *src_expr= order_list->first->item[0];
@@ -1085,6 +1109,7 @@ public:
cursor.on_next_partition(rownum);
end_of_partition= false;
+ added_values= false;
}
void next_partition(ha_rows rownum)
@@ -1109,25 +1134,38 @@ public:
(prev_row + n) >= R
We need to check about the current row.
*/
- if (cursor.restore_last_row())
- {
- if (order_direction * range_expr->cmp_read_only() < 0)
- return;
- add_value_to_items();
- }
walk_till_non_peer();
}
- ha_rows get_curr_rownum()
+ bool is_outside_computation_bounds() const
{
- return cursor.get_rownum();
+ if (!added_values)
+ return true;
+ return false;
+ }
+
+ ha_rows get_curr_rownum() const
+ {
+ if (end_of_partition)
+ return cursor.get_rownum(); // Cursor does not pass over partition bound.
+ else
+ return cursor.get_rownum() - 1; // Cursor is placed on first non peer.
}
private:
+ bool added_values;
+
void walk_till_non_peer()
{
+ cursor.fetch();
+ // Current row is not a peer.
+ if (order_direction * range_expr->cmp_read_only() < 0)
+ return;
+
+ add_value_to_items(); // Add current row.
+ added_values= true;
int res;
- while (!(res= cursor.get_next()))
+ while (!(res= cursor.next()))
{
if (order_direction * range_expr->cmp_read_only() < 0)
break;
@@ -1179,11 +1217,8 @@ public:
// Save the value of the current_row
peer_tracker.check_if_next_group();
cursor.on_next_partition(rownum);
- if (rownum != 0)
- {
- // Add the current row now because our cursor has already seen it
- add_value_to_items();
- }
+ // Add the current row now because our cursor has already seen it
+ add_value_to_items();
}
void next_partition(ha_rows rownum)
@@ -1194,8 +1229,6 @@ public:
void pre_next_row()
{
dont_move= !peer_tracker.check_if_next_group();
- if (!dont_move)
- add_value_to_items();
}
void next_row()
@@ -1213,7 +1246,7 @@ public:
walk_till_non_peer();
}
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@@ -1225,10 +1258,14 @@ private:
Walk forward until we've met first row that's not a peer of the current
row
*/
- while (!cursor.get_next())
+ while (!cursor.next())
{
if (peer_tracker.compare_with_cache())
+ {
+ cursor.prev(); // Move to our peer.
break;
+ }
+
add_value_to_items();
}
}
@@ -1278,7 +1315,7 @@ public:
{
// Fetch the value from the first row
peer_tracker.check_if_next_group();
- cursor.move_to(rownum+1);
+ cursor.move_to(rownum);
}
void next_partition(ha_rows rownum) {}
@@ -1298,17 +1335,17 @@ public:
Our cursor is pointing at the first row that was a peer of the previous
current row. Or, it was the first row in the partition.
*/
- if (cursor.restore_last_row())
- {
- // todo: need the following check ?
- if (!peer_tracker.compare_with_cache())
- return;
- remove_value_from_items();
- }
+ if (cursor.fetch())
+ return;
+
+ // todo: need the following check ?
+ if (!peer_tracker.compare_with_cache())
+ return;
+ remove_value_from_items();
do
{
- if (cursor.get_next())
+ if (cursor.next() || cursor.fetch())
return;
if (!peer_tracker.compare_with_cache())
return;
@@ -1318,7 +1355,7 @@ public:
}
}
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@@ -1357,7 +1394,7 @@ public:
/* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
}
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return curr_rownum;
}
@@ -1394,16 +1431,12 @@ public:
void next_partition(ha_rows rownum)
{
- if (!rownum)
- {
- /* Read the first row */
- if (cursor.get_next())
- return;
- }
+ /* Activate the first row */
+ cursor.fetch();
add_value_to_items();
/* Walk to the end of the partition, updating the SUM function */
- while (!cursor.get_next())
+ while (!cursor.next())
{
add_value_to_items();
}
@@ -1414,7 +1447,7 @@ public:
/* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
}
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@@ -1432,16 +1465,12 @@ public:
void next_partition(ha_rows rownum)
{
ha_rows num_rows_in_partition= 0;
- if (!rownum)
- {
- /* Read the first row */
- if (cursor.get_next())
- return;
- }
+ if (cursor.fetch())
+ return;
num_rows_in_partition++;
/* Walk to the end of the partition, find how many rows there are. */
- while (!cursor.get_next())
+ while (!cursor.next())
num_rows_in_partition++;
List_iterator_fast<Item_sum> it(sum_functions);
@@ -1454,7 +1483,7 @@ public:
}
}
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return cursor.get_rownum();
}
@@ -1474,12 +1503,12 @@ class Frame_n_rows_preceding : public Frame_cursor
const ha_rows n_rows;
/* Number of rows that we need to skip before our cursor starts moving */
- ha_rows n_rows_to_skip;
+ ha_rows n_rows_behind;
Table_read_cursor cursor;
public:
Frame_n_rows_preceding(bool is_top_bound_arg, ha_rows n_rows_arg) :
- is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
+ is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), n_rows_behind(0)
{}
void init(READ_RECORD *info)
@@ -1493,8 +1522,9 @@ public:
Position our cursor to point at the first row in the new partition
(for rownum=0, it is already there, otherwise, it lags behind)
*/
- if (rownum != 0)
- cursor.move_to(rownum);
+ cursor.move_to(rownum);
+ /* Cursor is in the same spot as current row. */
+ n_rows_behind= 0;
/*
Suppose the bound is ROWS 2 PRECEDING, and current row is row#n:
@@ -1508,37 +1538,65 @@ public:
- bottom bound should add row #(n-2) into the window function
- top bound should remove row (#n-3) from the window function.
*/
- n_rows_to_skip= n_rows + (is_top_bound? 1:0) - 1;
+ move_cursor_if_possible();
- /* Bottom bound "ROWS 0 PRECEDING" is a special case: */
- if (n_rows_to_skip == ha_rows(-1))
- {
- cursor.get_next();
- add_value_to_items();
- n_rows_to_skip= 0;
- }
}
void next_row()
{
- if (n_rows_to_skip)
+ n_rows_behind++;
+ move_cursor_if_possible();
+ }
+
+ bool is_outside_computation_bounds() const
+ {
+ /* As a bottom boundary, rows have not yet been added. */
+ if (!is_top_bound && n_rows - n_rows_behind)
+ return true;
+ return false;
+ }
+
+ ha_rows get_curr_rownum() const
+ {
+ return cursor.get_rownum();
+ }
+
+private:
+ void move_cursor_if_possible()
+ {
+ int rows_difference= n_rows - n_rows_behind;
+ if (rows_difference > 0) /* We still have to wait. */
+ return;
+
+ /* The cursor points to the first row in the frame. */
+ if (rows_difference == 0)
{
- n_rows_to_skip--;
+ if (!is_top_bound)
+ {
+ cursor.fetch();
+ add_value_to_items();
+ }
+ /* For top bound we don't have to remove anything as nothing was added. */
return;
}
- if (cursor.get_next())
- return; // this is not expected to happen.
+ /* We need to catch up by one row. */
+ DBUG_ASSERT(rows_difference == -1);
- if (is_top_bound) // this is frame start endpoint
+ if (is_top_bound)
+ {
+ cursor.fetch();
remove_value_from_items();
+ cursor.next();
+ }
else
+ {
+ cursor.next();
+ cursor.fetch();
add_value_to_items();
- }
-
- ha_rows get_curr_rownum()
- {
- return cursor.get_rownum();
+ }
+ /* We've advanced one row. We are no longer behind. */
+ n_rows_behind--;
}
};
@@ -1559,6 +1617,7 @@ public:
void pre_next_partition(ha_rows rownum)
{
add_value_to_items();
+ curr_rownum= rownum;
}
void next_partition(ha_rows rownum) {}
@@ -1574,7 +1633,7 @@ public:
curr_rownum++;
};
- ha_rows get_curr_rownum()
+ ha_rows get_curr_rownum() const
{
return curr_rownum;
}
@@ -1643,56 +1702,197 @@ public:
at_partition_end= false;
cursor.on_next_partition(rownum);
+ }
- if (rownum != 0)
- {
- // This is only needed for "FOLLOWING 1". It is one row behind
- cursor.move_to(rownum+1);
+ /* Move our cursor to be n_rows ahead. */
+ void next_partition(ha_rows rownum)
+ {
+ if (is_top_bound)
+ next_part_top(rownum);
+ else
+ next_part_bottom(rownum);
+ }
- // Current row points at the first row in the partition
- if (is_top_bound) // this is frame top endpoint
- remove_value_from_items();
- else
- add_value_to_items();
+ void next_row()
+ {
+ if (is_top_bound)
+ next_row_top();
+ else
+ next_row_bottom();
+ }
+
+ bool is_outside_computation_bounds() const
+ {
+ /*
+ The top bound can go over the current partition. In this case,
+ the sum function has 0 values added to it.
+ */
+ if (at_partition_end && is_top_bound)
+ return true;
+ return false;
+ }
+
+ ha_rows get_curr_rownum() const
+ {
+ return cursor.get_rownum();
+ }
+
+private:
+ void next_part_top(ha_rows rownum)
+ {
+ for (ha_rows i= 0; i < n_rows; i++)
+ {
+ if (cursor.fetch())
+ break;
+ remove_value_from_items();
+ if (cursor.next())
+ at_partition_end= true;
}
}
- /* Move our cursor to be n_rows ahead. */
- void next_partition(ha_rows rownum)
+ void next_part_bottom(ha_rows rownum)
{
- ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
- for (ha_rows i= 0; i < i_end; i++)
+ if (cursor.fetch())
+ return;
+ add_value_to_items();
+
+ for (ha_rows i= 0; i < n_rows; i++)
{
- if (next_row_intern())
+ if (cursor.next())
+ {
+ at_partition_end= true;
break;
+ }
+ add_value_to_items();
}
+ return;
}
- void next_row()
+ void next_row_top()
+ {
+ if (cursor.fetch()) // PART END OR FAILURE
+ {
+ at_partition_end= true;
+ return;
+ }
+ remove_value_from_items();
+ if (cursor.next())
+ {
+ at_partition_end= true;
+ return;
+ }
+ }
+
+ void next_row_bottom()
{
if (at_partition_end)
return;
- next_row_intern();
+
+ if (cursor.next())
+ {
+ at_partition_end= true;
+ return;
+ }
+
+ add_value_to_items();
+
+ }
+};
+
+/*
+ A cursor that performs a table scan between two indices. The indices
+ are provided by the two cursors representing the top and bottom bound
+ of the window function's frame definition.
+
+ Each scan clears the sum function.
+
+ NOTE:
+ The cursor does not alter the top and bottom cursors.
+ This type of cursor is expensive computational wise. This is only to be
+ used when the sum functions do not support removal.
+*/
+class Frame_scan_cursor : public Frame_cursor
+{
+public:
+ Frame_scan_cursor(const Frame_cursor &top_bound,
+ const Frame_cursor &bottom_bound) :
+ top_bound(top_bound), bottom_bound(bottom_bound) {}
+
+ void init(READ_RECORD *info)
+ {
+ cursor.init(info);
}
- ha_rows get_curr_rownum()
+ void pre_next_partition(ha_rows rownum)
{
- return cursor.get_rownum();
+ /* TODO(cvicentiu) Sum functions get cleared on next partition anyway during
+ the window function computation algorithm. Either perform this only in
+ cursors, or remove it from pre_next_partition.
+ */
+ curr_rownum= rownum;
+ clear_sum_functions();
+ }
+
+ void next_partition(ha_rows rownum)
+ {
+ compute_values_for_current_row();
+ }
+
+ void pre_next_row()
+ {
+ clear_sum_functions();
+ }
+
+ void next_row()
+ {
+ curr_rownum++;
+ compute_values_for_current_row();
+ }
+
+ ha_rows get_curr_rownum() const
+ {
+ return curr_rownum;
}
private:
- bool next_row_intern()
+ const Frame_cursor &top_bound;
+ const Frame_cursor &bottom_bound;
+ Table_read_cursor cursor;
+ ha_rows curr_rownum;
+
+ /* Clear all sum functions handled by this cursor. */
+ void clear_sum_functions()
{
- if (!cursor.get_next())
+ List_iterator_fast<Item_sum> iter_sum_func(sum_functions);
+ Item_sum *sum_func;
+ while ((sum_func= iter_sum_func++))
{
- if (is_top_bound) // this is frame start endpoint
- remove_value_from_items();
- else
- add_value_to_items();
+ sum_func->clear();
+ }
+ }
+
+ /* Scan the rows between the top bound and bottom bound. Add all the values
+ between them, top bound row and bottom bound row inclusive. */
+ void compute_values_for_current_row()
+ {
+ if (top_bound.is_outside_computation_bounds() ||
+ bottom_bound.is_outside_computation_bounds())
+ return;
+
+ ha_rows start_rownum= top_bound.get_curr_rownum();
+ ha_rows bottom_rownum= bottom_bound.get_curr_rownum();
+ DBUG_PRINT("info", ("COMPUTING (%llu %llu)", start_rownum, bottom_rownum));
+
+ cursor.move_to(start_rownum);
+
+ for (ha_rows idx= start_rownum; idx <= bottom_rownum; idx++)
+ {
+ if (cursor.fetch()) //EOF
+ break;
+ add_value_to_items();
+ if (cursor.next()) // EOF
+ break;
}
- else
- at_partition_end= true;
- return at_partition_end;
}
};
@@ -1836,6 +2036,20 @@ void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
}
+static bool is_computed_with_remove(Item_sum::Sumfunctype sum_func)
+{
+ switch (sum_func)
+ {
+ case Item_sum::CUME_DIST_FUNC:
+ case Item_sum::ROW_NUMBER_FUNC:
+ case Item_sum::RANK_FUNC:
+ case Item_sum::DENSE_RANK_FUNC:
+ case Item_sum::NTILE_FUNC:
+ return false;
+ default:
+ return true;
+ }
+}
/*
Create required frame cursors for the list of window functions.
Register all functions to their appropriate cursors.
@@ -1896,6 +2110,17 @@ void get_window_functions_required_cursors(
*/
cursor_manager->add_cursor(frame_bottom);
cursor_manager->add_cursor(frame_top);
+ if (is_computed_with_remove(sum_func->sum_func()) &&
+ !sum_func->supports_removal())
+ {
+ frame_bottom->set_no_action();
+ frame_top->set_no_action();
+ Frame_cursor *scan_cursor= new Frame_scan_cursor(*frame_top,
+ *frame_bottom);
+ scan_cursor->add_sum_func(sum_func);
+ cursor_manager->add_cursor(scan_cursor);
+
+ }
cursor_managers->push_back(cursor_manager);
}
}