diff options
author | Vicențiu Ciorbaru <vicentiu@mariadb.org> | 2016-09-09 14:46:47 +0300 |
---|---|---|
committer | Vicențiu Ciorbaru <vicentiu@mariadb.org> | 2016-09-09 18:33:29 +0300 |
commit | dfd3be928d98d123e86b8ad3ae307e27153ab11c (patch) | |
tree | 212b3b4f7ab92966bc6df662e88e5281dce9c0b5 /sql | |
parent | ffed20c5635a58bc1465efd5d936650b1672da74 (diff) | |
download | mariadb-git-dfd3be928d98d123e86b8ad3ae307e27153ab11c.tar.gz |
Make cursor implementation uniform
Cursors now report their current row number as the boundary of the
partition. This is used by Frame_scan_cursor to compute aggregate
functions that do not support removal.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/item_sum.h | 23 | ||||
-rw-r--r-- | sql/sql_window.cc | 505 |
2 files changed, 387 insertions, 141 deletions
diff --git a/sql/item_sum.h b/sql/item_sum.h index 9fd6f7867db..16835441125 100644 --- a/sql/item_sum.h +++ b/sql/item_sum.h @@ -543,7 +543,8 @@ public: virtual void clear()= 0; virtual bool add()= 0; virtual bool setup(THD *thd) { return false; } - + + virtual bool supports_removal() const { return false; } virtual void remove() { DBUG_ASSERT(0); } virtual void cleanup(); @@ -771,6 +772,11 @@ public: Item *get_copy(THD *thd, MEM_ROOT *mem_root) { return get_item_copy<Item_sum_sum>(thd, mem_root, this); } + bool supports_removal() const + { + return true; + } + private: void add_helper(bool perform_removal); ulonglong count; @@ -829,6 +835,11 @@ class Item_sum_count :public Item_sum_int Item *copy_or_same(THD* thd); Item *get_copy(THD *thd, MEM_ROOT *mem_root) { return get_item_copy<Item_sum_count>(thd, mem_root, this); } + + bool supports_removal() const + { + return true; + } }; @@ -878,6 +889,11 @@ public: } Item *get_copy(THD *thd, MEM_ROOT *mem_root) { return get_item_copy<Item_sum_avg>(thd, mem_root, this); } + + bool supports_removal() const + { + return true; + } }; @@ -1089,6 +1105,11 @@ public: DBUG_ASSERT(0); } + bool supports_removal() const + { + return true; + } + protected: static const int NUM_BIT_COUNTERS= 64; ulonglong reset_bits,bits; diff --git a/sql/sql_window.cc b/sql/sql_window.cc index a2e793f9582..7192c8a492e 100644 --- a/sql/sql_window.cc +++ b/sql/sql_window.cc @@ -4,6 +4,7 @@ #include "filesort.h" #include "sql_base.h" #include "sql_window.h" +#include "my_dbug.h" bool @@ -549,29 +550,44 @@ public: ref_length= info->ref_length; } - virtual int get_next() + virtual int next() { - /* Allow multiple get_next() calls in EOF state*/ + /* Allow multiple next() calls in EOF state. */ if (cache_pos == cache_end) return -1; + cache_pos+= ref_length; + DBUG_ASSERT(cache_pos <= cache_end); + + return 0; + } + + virtual int prev() + { + /* Allow multiple prev() calls when positioned at the start. */ + if (cache_pos == cache_start) + return -1; + cache_pos-= ref_length; + DBUG_ASSERT(cache_pos >= cache_start); + return 0; } - ha_rows get_rownum() + ha_rows get_rownum() const { return (cache_pos - cache_start) / ref_length; } void move_to(ha_rows row_number) { - cache_pos= cache_start + row_number * ref_length; + cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length); + DBUG_ASSERT(cache_pos <= cache_end); } protected: bool at_eof() { return (cache_pos == cache_end); } - uchar *get_last_rowid() + uchar *get_prev_rowid() { if (cache_pos == cache_start) return NULL; @@ -601,30 +617,25 @@ public: void init(READ_RECORD *info) { Rowid_seq_cursor::init(info); - read_record= info; + table= info->table; + record= info->record; } - virtual int get_next() + virtual int fetch() { if (at_eof()) return -1; uchar* curr_rowid= get_curr_rowid(); - int res= Rowid_seq_cursor::get_next(); - if (!res) - { - res= read_record->table->file->ha_rnd_pos(read_record->record, - curr_rowid); - } - return res; + return table->file->ha_rnd_pos(record, curr_rowid); } - bool restore_last_row() + bool fetch_prev_row() { uchar *p; - if ((p= get_last_rowid())) + if ((p= get_prev_rowid())) { - int rc= read_record->table->file->ha_rnd_pos(read_record->record, p); + int rc= table->file->ha_rnd_pos(record, p); if (!rc) return true; // restored ok } @@ -632,11 +643,10 @@ public: } private: - /* - Note: we don't own *read_record, somebody else is using it. - We only look at the constant part of it, e.g. table, record buffer, etc. - */ - READ_RECORD *read_record; + /* The table that is acccesed by this cursor. */ + TABLE *table; + /* Buffer where to store the table's record data. */ + uchar *record; // TODO(spetrunia): should move_to() also read row here? }; @@ -645,6 +655,8 @@ private: /* A cursor which only moves within a partition. The scan stops at the partition end, and it needs an explicit command to move to the next partition. + + This cursor can not move backwards. */ class Partition_read_cursor : public Table_read_cursor @@ -669,39 +681,36 @@ public: void on_next_partition(ha_rows rownum) { /* Remember the sort key value from the new partition */ + move_to(rownum); bound_tracker.check_if_next_group(); end_of_partition= false; - } - /* - Moves to a new row. The row is assumed to be within the current partition. - */ - void move_to(ha_rows rownum) { Table_read_cursor::move_to(rownum); } + } /* This returns -1 when end of partition was reached. */ - int get_next() + int next() { int res; if (end_of_partition) return -1; - if ((res= Table_read_cursor::get_next())) + + if ((res= Table_read_cursor::next()) || + (res= fetch())) return res; if (bound_tracker.compare_with_cache()) { + /* This row is part of a new partition, don't move + forward any more untill we get informed of a new partition. */ + Table_read_cursor::prev(); end_of_partition= true; return -1; } return 0; } - bool restore_last_row() - { - return Table_read_cursor::restore_last_row(); - } - private: Group_bound_tracker bound_tracker; bool end_of_partition; @@ -783,6 +792,8 @@ public: virtual void pre_next_row() {}; virtual void next_row()=0; + virtual bool is_outside_computation_bounds() const { return false; }; + virtual ~Frame_cursor() {} /* @@ -796,7 +807,7 @@ public: } /* Retrieves the row number that this cursor currently points at. */ - virtual ha_rows get_curr_rownum()= 0; + virtual ha_rows get_curr_rownum() const= 0; protected: inline void add_value_to_items() @@ -962,7 +973,6 @@ public: void next_partition(ha_rows rownum) { - cursor.move_to(rownum); walk_till_non_peer(); } @@ -982,26 +992,39 @@ public: (prev_row + n) >= R We need to check about the current row. */ - if (cursor.restore_last_row()) - { - if (order_direction * range_expr->cmp_read_only() <= 0) - return; - remove_value_from_items(); - } walk_till_non_peer(); } - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return cursor.get_rownum(); } + bool is_outside_computation_bounds() const + { + if (end_of_partition) + return true; + return false; + } + private: void walk_till_non_peer() { + if (cursor.fetch()) // ERROR + return; + // Current row is not a peer. + if (order_direction * range_expr->cmp_read_only() <= 0) + return; + remove_value_from_items(); + int res; - while (!(res= cursor.get_next())) + while (!(res= cursor.next())) { + /* Note, no need to fetch the value explicitly here. The partition + read cursor will fetch it to check if the partition has changed. + TODO(cvicentiu) make this piece of information not necessary by + reimplementing Partition_read_cursor. + */ if (order_direction * range_expr->cmp_read_only() <= 0) break; remove_value_from_items(); @@ -1009,6 +1032,7 @@ private: if (res) end_of_partition= true; } + }; @@ -1049,7 +1073,7 @@ public: SQL_I_List<ORDER> *order_list, bool is_preceding_arg, Item *n_val_arg) : cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL), - is_preceding(is_preceding_arg) + is_preceding(is_preceding_arg), added_values(false) { DBUG_ASSERT(order_list->elements == 1); Item *src_expr= order_list->first->item[0]; @@ -1085,6 +1109,7 @@ public: cursor.on_next_partition(rownum); end_of_partition= false; + added_values= false; } void next_partition(ha_rows rownum) @@ -1109,25 +1134,38 @@ public: (prev_row + n) >= R We need to check about the current row. */ - if (cursor.restore_last_row()) - { - if (order_direction * range_expr->cmp_read_only() < 0) - return; - add_value_to_items(); - } walk_till_non_peer(); } - ha_rows get_curr_rownum() + bool is_outside_computation_bounds() const { - return cursor.get_rownum(); + if (!added_values) + return true; + return false; + } + + ha_rows get_curr_rownum() const + { + if (end_of_partition) + return cursor.get_rownum(); // Cursor does not pass over partition bound. + else + return cursor.get_rownum() - 1; // Cursor is placed on first non peer. } private: + bool added_values; + void walk_till_non_peer() { + cursor.fetch(); + // Current row is not a peer. + if (order_direction * range_expr->cmp_read_only() < 0) + return; + + add_value_to_items(); // Add current row. + added_values= true; int res; - while (!(res= cursor.get_next())) + while (!(res= cursor.next())) { if (order_direction * range_expr->cmp_read_only() < 0) break; @@ -1179,11 +1217,8 @@ public: // Save the value of the current_row peer_tracker.check_if_next_group(); cursor.on_next_partition(rownum); - if (rownum != 0) - { - // Add the current row now because our cursor has already seen it - add_value_to_items(); - } + // Add the current row now because our cursor has already seen it + add_value_to_items(); } void next_partition(ha_rows rownum) @@ -1194,8 +1229,6 @@ public: void pre_next_row() { dont_move= !peer_tracker.check_if_next_group(); - if (!dont_move) - add_value_to_items(); } void next_row() @@ -1213,7 +1246,7 @@ public: walk_till_non_peer(); } - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return cursor.get_rownum(); } @@ -1225,10 +1258,14 @@ private: Walk forward until we've met first row that's not a peer of the current row */ - while (!cursor.get_next()) + while (!cursor.next()) { if (peer_tracker.compare_with_cache()) + { + cursor.prev(); // Move to our peer. break; + } + add_value_to_items(); } } @@ -1278,7 +1315,7 @@ public: { // Fetch the value from the first row peer_tracker.check_if_next_group(); - cursor.move_to(rownum+1); + cursor.move_to(rownum); } void next_partition(ha_rows rownum) {} @@ -1298,17 +1335,17 @@ public: Our cursor is pointing at the first row that was a peer of the previous current row. Or, it was the first row in the partition. */ - if (cursor.restore_last_row()) - { - // todo: need the following check ? - if (!peer_tracker.compare_with_cache()) - return; - remove_value_from_items(); - } + if (cursor.fetch()) + return; + + // todo: need the following check ? + if (!peer_tracker.compare_with_cache()) + return; + remove_value_from_items(); do { - if (cursor.get_next()) + if (cursor.next() || cursor.fetch()) return; if (!peer_tracker.compare_with_cache()) return; @@ -1318,7 +1355,7 @@ public: } } - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return cursor.get_rownum(); } @@ -1357,7 +1394,7 @@ public: /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */ } - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return curr_rownum; } @@ -1394,16 +1431,12 @@ public: void next_partition(ha_rows rownum) { - if (!rownum) - { - /* Read the first row */ - if (cursor.get_next()) - return; - } + /* Activate the first row */ + cursor.fetch(); add_value_to_items(); /* Walk to the end of the partition, updating the SUM function */ - while (!cursor.get_next()) + while (!cursor.next()) { add_value_to_items(); } @@ -1414,7 +1447,7 @@ public: /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */ } - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return cursor.get_rownum(); } @@ -1432,16 +1465,12 @@ public: void next_partition(ha_rows rownum) { ha_rows num_rows_in_partition= 0; - if (!rownum) - { - /* Read the first row */ - if (cursor.get_next()) - return; - } + if (cursor.fetch()) + return; num_rows_in_partition++; /* Walk to the end of the partition, find how many rows there are. */ - while (!cursor.get_next()) + while (!cursor.next()) num_rows_in_partition++; List_iterator_fast<Item_sum> it(sum_functions); @@ -1454,7 +1483,7 @@ public: } } - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return cursor.get_rownum(); } @@ -1474,12 +1503,12 @@ class Frame_n_rows_preceding : public Frame_cursor const ha_rows n_rows; /* Number of rows that we need to skip before our cursor starts moving */ - ha_rows n_rows_to_skip; + ha_rows n_rows_behind; Table_read_cursor cursor; public: Frame_n_rows_preceding(bool is_top_bound_arg, ha_rows n_rows_arg) : - is_top_bound(is_top_bound_arg), n_rows(n_rows_arg) + is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), n_rows_behind(0) {} void init(READ_RECORD *info) @@ -1493,8 +1522,9 @@ public: Position our cursor to point at the first row in the new partition (for rownum=0, it is already there, otherwise, it lags behind) */ - if (rownum != 0) - cursor.move_to(rownum); + cursor.move_to(rownum); + /* Cursor is in the same spot as current row. */ + n_rows_behind= 0; /* Suppose the bound is ROWS 2 PRECEDING, and current row is row#n: @@ -1508,37 +1538,65 @@ public: - bottom bound should add row #(n-2) into the window function - top bound should remove row (#n-3) from the window function. */ - n_rows_to_skip= n_rows + (is_top_bound? 1:0) - 1; + move_cursor_if_possible(); - /* Bottom bound "ROWS 0 PRECEDING" is a special case: */ - if (n_rows_to_skip == ha_rows(-1)) - { - cursor.get_next(); - add_value_to_items(); - n_rows_to_skip= 0; - } } void next_row() { - if (n_rows_to_skip) + n_rows_behind++; + move_cursor_if_possible(); + } + + bool is_outside_computation_bounds() const + { + /* As a bottom boundary, rows have not yet been added. */ + if (!is_top_bound && n_rows - n_rows_behind) + return true; + return false; + } + + ha_rows get_curr_rownum() const + { + return cursor.get_rownum(); + } + +private: + void move_cursor_if_possible() + { + int rows_difference= n_rows - n_rows_behind; + if (rows_difference > 0) /* We still have to wait. */ + return; + + /* The cursor points to the first row in the frame. */ + if (rows_difference == 0) { - n_rows_to_skip--; + if (!is_top_bound) + { + cursor.fetch(); + add_value_to_items(); + } + /* For top bound we don't have to remove anything as nothing was added. */ return; } - if (cursor.get_next()) - return; // this is not expected to happen. + /* We need to catch up by one row. */ + DBUG_ASSERT(rows_difference == -1); - if (is_top_bound) // this is frame start endpoint + if (is_top_bound) + { + cursor.fetch(); remove_value_from_items(); + cursor.next(); + } else + { + cursor.next(); + cursor.fetch(); add_value_to_items(); - } - - ha_rows get_curr_rownum() - { - return cursor.get_rownum(); + } + /* We've advanced one row. We are no longer behind. */ + n_rows_behind--; } }; @@ -1559,6 +1617,7 @@ public: void pre_next_partition(ha_rows rownum) { add_value_to_items(); + curr_rownum= rownum; } void next_partition(ha_rows rownum) {} @@ -1574,7 +1633,7 @@ public: curr_rownum++; }; - ha_rows get_curr_rownum() + ha_rows get_curr_rownum() const { return curr_rownum; } @@ -1643,56 +1702,197 @@ public: at_partition_end= false; cursor.on_next_partition(rownum); + } - if (rownum != 0) - { - // This is only needed for "FOLLOWING 1". It is one row behind - cursor.move_to(rownum+1); + /* Move our cursor to be n_rows ahead. */ + void next_partition(ha_rows rownum) + { + if (is_top_bound) + next_part_top(rownum); + else + next_part_bottom(rownum); + } - // Current row points at the first row in the partition - if (is_top_bound) // this is frame top endpoint - remove_value_from_items(); - else - add_value_to_items(); + void next_row() + { + if (is_top_bound) + next_row_top(); + else + next_row_bottom(); + } + + bool is_outside_computation_bounds() const + { + /* + The top bound can go over the current partition. In this case, + the sum function has 0 values added to it. + */ + if (at_partition_end && is_top_bound) + return true; + return false; + } + + ha_rows get_curr_rownum() const + { + return cursor.get_rownum(); + } + +private: + void next_part_top(ha_rows rownum) + { + for (ha_rows i= 0; i < n_rows; i++) + { + if (cursor.fetch()) + break; + remove_value_from_items(); + if (cursor.next()) + at_partition_end= true; } } - /* Move our cursor to be n_rows ahead. */ - void next_partition(ha_rows rownum) + void next_part_bottom(ha_rows rownum) { - ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound; - for (ha_rows i= 0; i < i_end; i++) + if (cursor.fetch()) + return; + add_value_to_items(); + + for (ha_rows i= 0; i < n_rows; i++) { - if (next_row_intern()) + if (cursor.next()) + { + at_partition_end= true; break; + } + add_value_to_items(); } + return; } - void next_row() + void next_row_top() + { + if (cursor.fetch()) // PART END OR FAILURE + { + at_partition_end= true; + return; + } + remove_value_from_items(); + if (cursor.next()) + { + at_partition_end= true; + return; + } + } + + void next_row_bottom() { if (at_partition_end) return; - next_row_intern(); + + if (cursor.next()) + { + at_partition_end= true; + return; + } + + add_value_to_items(); + + } +}; + +/* + A cursor that performs a table scan between two indices. The indices + are provided by the two cursors representing the top and bottom bound + of the window function's frame definition. + + Each scan clears the sum function. + + NOTE: + The cursor does not alter the top and bottom cursors. + This type of cursor is expensive computational wise. This is only to be + used when the sum functions do not support removal. +*/ +class Frame_scan_cursor : public Frame_cursor +{ +public: + Frame_scan_cursor(const Frame_cursor &top_bound, + const Frame_cursor &bottom_bound) : + top_bound(top_bound), bottom_bound(bottom_bound) {} + + void init(READ_RECORD *info) + { + cursor.init(info); } - ha_rows get_curr_rownum() + void pre_next_partition(ha_rows rownum) { - return cursor.get_rownum(); + /* TODO(cvicentiu) Sum functions get cleared on next partition anyway during + the window function computation algorithm. Either perform this only in + cursors, or remove it from pre_next_partition. + */ + curr_rownum= rownum; + clear_sum_functions(); + } + + void next_partition(ha_rows rownum) + { + compute_values_for_current_row(); + } + + void pre_next_row() + { + clear_sum_functions(); + } + + void next_row() + { + curr_rownum++; + compute_values_for_current_row(); + } + + ha_rows get_curr_rownum() const + { + return curr_rownum; } private: - bool next_row_intern() + const Frame_cursor &top_bound; + const Frame_cursor &bottom_bound; + Table_read_cursor cursor; + ha_rows curr_rownum; + + /* Clear all sum functions handled by this cursor. */ + void clear_sum_functions() { - if (!cursor.get_next()) + List_iterator_fast<Item_sum> iter_sum_func(sum_functions); + Item_sum *sum_func; + while ((sum_func= iter_sum_func++)) { - if (is_top_bound) // this is frame start endpoint - remove_value_from_items(); - else - add_value_to_items(); + sum_func->clear(); + } + } + + /* Scan the rows between the top bound and bottom bound. Add all the values + between them, top bound row and bottom bound row inclusive. */ + void compute_values_for_current_row() + { + if (top_bound.is_outside_computation_bounds() || + bottom_bound.is_outside_computation_bounds()) + return; + + ha_rows start_rownum= top_bound.get_curr_rownum(); + ha_rows bottom_rownum= bottom_bound.get_curr_rownum(); + DBUG_PRINT("info", ("COMPUTING (%llu %llu)", start_rownum, bottom_rownum)); + + cursor.move_to(start_rownum); + + for (ha_rows idx= start_rownum; idx <= bottom_rownum; idx++) + { + if (cursor.fetch()) //EOF + break; + add_value_to_items(); + if (cursor.next()) // EOF + break; } - else - at_partition_end= true; - return at_partition_end; } }; @@ -1836,6 +2036,20 @@ void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager, } +static bool is_computed_with_remove(Item_sum::Sumfunctype sum_func) +{ + switch (sum_func) + { + case Item_sum::CUME_DIST_FUNC: + case Item_sum::ROW_NUMBER_FUNC: + case Item_sum::RANK_FUNC: + case Item_sum::DENSE_RANK_FUNC: + case Item_sum::NTILE_FUNC: + return false; + default: + return true; + } +} /* Create required frame cursors for the list of window functions. Register all functions to their appropriate cursors. @@ -1896,6 +2110,17 @@ void get_window_functions_required_cursors( */ cursor_manager->add_cursor(frame_bottom); cursor_manager->add_cursor(frame_top); + if (is_computed_with_remove(sum_func->sum_func()) && + !sum_func->supports_removal()) + { + frame_bottom->set_no_action(); + frame_top->set_no_action(); + Frame_cursor *scan_cursor= new Frame_scan_cursor(*frame_top, + *frame_bottom); + scan_cursor->add_sum_func(sum_func); + cursor_manager->add_cursor(scan_cursor); + + } cursor_managers->push_back(cursor_manager); } } |