diff options
author | Vicențiu Ciorbaru <vicentiu@mariadb.org> | 2016-09-01 18:10:15 +0300 |
---|---|---|
committer | Vicențiu Ciorbaru <vicentiu@mariadb.org> | 2016-09-09 18:32:35 +0300 |
commit | 23e8b508a00a23653da436519371943487ad6fe4 (patch) | |
tree | 39640892aea08485ea2eb8f9005472b1808d500c /sql/sql_window.cc | |
parent | 19d24f011cc16efd0edb4ca8cc9ce8c720842961 (diff) | |
download | mariadb-git-23e8b508a00a23653da436519371943487ad6fe4.tar.gz |
MDEV-10059: Compute window functions with same sorting criteria simultaneously
Perform only one table scan for each window function present. We do this
by keeping keeping cursors for each window function frame bound and
running them for each function for every row.
Diffstat (limited to 'sql/sql_window.cc')
-rw-r--r-- | sql/sql_window.cc | 884 |
1 files changed, 529 insertions, 355 deletions
diff --git a/sql/sql_window.cc b/sql/sql_window.cc index 4c5ef53d854..a862821bd56 100644 --- a/sql/sql_window.cc +++ b/sql/sql_window.cc @@ -37,12 +37,12 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it) if (win_spec->order_list->elements && order_list->elements) { my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0), ref_name); - return true; + return true; } - if (win_spec->window_frame) + if (win_spec->window_frame) { my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0), ref_name); - return true; + return true; } referenced_win_spec= win_spec; if (partition_list->elements == 0) @@ -54,7 +54,7 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it) if (ref_name && !referenced_win_spec) { my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name); - return true; + return true; } window_names_are_checked= true; return false; @@ -73,7 +73,7 @@ Window_frame::check_frame_bounds() top_bound->precedence_type == Window_frame_bound::FOLLOWING)) { my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0)); - return true; + return true; } return false; @@ -86,7 +86,7 @@ Window_frame::check_frame_bounds() int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, - List<Item> &fields, List<Item> &all_fields, + List<Item> &fields, List<Item> &all_fields, List<Window_spec> &win_specs, List<Item_window_func> &win_funcs) { Window_spec *win_spec; @@ -116,7 +116,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, it.rewind(); List_iterator_fast<Window_spec> itp(win_specs); - + while ((win_spec= it++)) { bool hidden_group_fields; @@ -131,7 +131,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, { DBUG_RETURN(1); } - + if (win_spec->window_frame && win_spec->window_frame->exclusion != Window_frame::EXCL_NONE) { @@ -188,7 +188,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, } } } - + /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */ if (win_spec->window_frame && win_spec->window_frame->units == Window_frame::UNITS_ROWS) @@ -219,7 +219,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, { win_func_item->update_used_tables(); } - + DBUG_RETURN(0); } @@ -445,7 +445,7 @@ typedef int (*Item_window_func_cmp)(Item_window_func *f1, @brief Sort window functions so that those that can be computed together are adjacent. - + @detail Sort window functions by their - required sorting order, @@ -498,48 +498,15 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list) } else if (win_spec_prev->window_frame != win_spec_curr->window_frame) curr->marker|= FRAME_CHANGE_FLAG; - - prev= curr; - } + + prev= curr; + } } ///////////////////////////////////////////////////////////////////////////// -/* - Do a pass over sorted table and compute window function values. - - This function is for handling window functions that can be computed on the - fly. Examples are RANK() and ROW_NUMBER(). -*/ -bool compute_window_func_values(Item_window_func *item_win, - TABLE *tbl, READ_RECORD *info) -{ - int err; - while (!(err=info->read_record(info))) - { - store_record(tbl,record[1]); - - /* - This will cause window function to compute its value for the - current row : - */ - item_win->advance_window(); - - /* - Put the new value into temptable's field - TODO: Should this use item_win->update_field() call? - Regular aggegate function implementations seem to implement it. - */ - item_win->save_in_field(item_win->result_field, true); - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); - if (err && err != HA_ERR_RECORD_IS_THE_SAME) - return true; - } - return false; -} - ///////////////////////////////////////////////////////////////////////////// // Window Frames support ///////////////////////////////////////////////////////////////////////////// @@ -571,11 +538,6 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst) class Rowid_seq_cursor { - uchar *cache_start; - uchar *cache_pos; - uchar *cache_end; - uint ref_length; - public: virtual ~Rowid_seq_cursor() {} @@ -595,17 +557,17 @@ public: cache_pos+= ref_length; return 0; } - + ha_rows get_rownum() { return (cache_pos - cache_start) / ref_length; } - // will be called by ROWS n FOLLOWING to catch up. void move_to(ha_rows row_number) { cache_pos= cache_start + row_number * ref_length; } + protected: bool at_eof() { return (cache_pos == cache_end); } @@ -618,6 +580,12 @@ protected: } uchar *get_curr_rowid() { return cache_pos; } + +private: + uchar *cache_start; + uchar *cache_pos; + uchar *cache_end; + uint ref_length; }; @@ -627,11 +595,6 @@ protected: class Table_read_cursor : public Rowid_seq_cursor { - /* - Note: we don't own *read_record, somebody else is using it. - We only look at the constant part of it, e.g. table, record buffer, etc. - */ - READ_RECORD *read_record; public: virtual ~Table_read_cursor() {} @@ -668,7 +631,14 @@ public: return false; // didn't restore } - // todo: should move_to() also read row here? +private: + /* + Note: we don't own *read_record, somebody else is using it. + We only look at the constant part of it, e.g. table, record buffer, etc. + */ + READ_RECORD *read_record; + + // TODO(spetrunia): should move_to() also read row here? }; @@ -679,14 +649,14 @@ public: class Partition_read_cursor { - Table_read_cursor tbl_cursor; - Group_bound_tracker bound_tracker; - bool end_of_partition; public: - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list) + Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) : + bound_tracker(thd, partition_list) {} + + void init(READ_RECORD *info) { tbl_cursor.init(info); - bound_tracker.init(thd, partition_list); + bound_tracker.init(); end_of_partition= false; } @@ -704,7 +674,7 @@ public: } /* - Moves to a new row. The row is assumed to be within the current partition + Moves to a new row. The row is assumed to be within the current partition. */ void move_to(ha_rows rownum) { tbl_cursor.move_to(rownum); } @@ -731,14 +701,18 @@ public: { return tbl_cursor.restore_last_row(); } + +private: + Table_read_cursor tbl_cursor; + Group_bound_tracker bound_tracker; + bool end_of_partition; }; ///////////////////////////////////////////////////////////////////////////// - /* Window frame bound cursor. Abstract interface. - + @detail The cursor moves within the partition that the current row is in. It may be ahead or behind the current row. @@ -775,11 +749,12 @@ public: class Frame_cursor : public Sql_alloc { public: - virtual void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) - {} + virtual void init(READ_RECORD *info) {}; + bool add_sum_func(Item_sum* item) + { + return sum_functions.push_back(item); + } /* Current row has moved to the next partition and is positioned on the first row there. Position the frame bound accordingly. @@ -796,20 +771,95 @@ public: - The callee may move tbl->file and tbl->record[0] to point to some other row. */ - virtual void pre_next_partition(ha_rows rownum, Item_sum* item){}; - virtual void next_partition(ha_rows rownum, Item_sum* item)=0; - + virtual void pre_next_partition(ha_rows rownum) {}; + virtual void next_partition(ha_rows rownum)=0; + /* The current row has moved one row forward. Move this frame bound accordingly, and update the value of aggregate function as necessary. */ - virtual void pre_next_row(Item_sum* item){}; - virtual void next_row(Item_sum* item)=0; - - virtual ~Frame_cursor(){} + virtual void pre_next_row() {}; + virtual void next_row()=0; + + virtual ~Frame_cursor() {} + +protected: + inline void add_value_to_items() + { + List_iterator_fast<Item_sum> it(sum_functions); + Item_sum *item_sum; + while ((item_sum= it++)) + { + item_sum->add(); + } + } + inline void remove_value_from_items() + { + List_iterator_fast<Item_sum> it(sum_functions); + Item_sum *item_sum; + while ((item_sum= it++)) + { + item_sum->remove(); + } + } + + /* Sum functions that this cursor handles. */ + List<Item_sum> sum_functions; +}; + +/* + A class that owns cursor objects associated with a specific window function. +*/ +class Cursor_manager +{ +public: + bool add_cursor(Frame_cursor *cursor) + { + return cursors.push_back(cursor); + } + + void initialize_cursors(READ_RECORD *info) + { + List_iterator_fast<Frame_cursor> iter(cursors); + Frame_cursor *fc; + while ((fc= iter++)) + fc->init(info); + } + + void notify_cursors_partition_changed(ha_rows rownum) + { + List_iterator_fast<Frame_cursor> iter(cursors); + Frame_cursor *cursor; + while ((cursor= iter++)) + cursor->pre_next_partition(rownum); + + iter.rewind(); + while ((cursor= iter++)) + cursor->next_partition(rownum); + } + + void notify_cursors_next_row() + { + List_iterator_fast<Frame_cursor> iter(cursors); + Frame_cursor *cursor; + while ((cursor= iter++)) + cursor->pre_next_row(); + + iter.rewind(); + while ((cursor= iter++)) + cursor->next_row(); + } + + ~Cursor_manager() { cursors.delete_elements(); } + +private: + /* List of the cursors that this manager owns. */ + List<Frame_cursor> cursors; }; + + ////////////////////////////////////////////////////////////////////////////// // RANGE-type frames ////////////////////////////////////////////////////////////////////////////// @@ -841,16 +891,12 @@ class Frame_range_n_top : public Frame_cursor */ int order_direction; public: - Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) : + Frame_range_n_top(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list, + bool is_preceding_arg, Item *n_val_arg) : n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg) - {} - - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) { - cursor.init(info); - DBUG_ASSERT(order_list->elements == 1); Item *src_expr= order_list->first->item[0]; if (order_list->first->direction == ORDER::ORDER_ASC) @@ -872,24 +918,30 @@ public: item_add->fix_fields(thd, &item_add); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void init(READ_RECORD *info) + { + cursor.init(info); + + } + + void pre_next_partition(ha_rows rownum) { // Save the value of FUNC(current_row) range_expr->fetch_value_from(item_add); } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { cursor.move_to(rownum); - walk_till_non_peer(item); + walk_till_non_peer(); } - void pre_next_row(Item_sum* item) + void pre_next_row() { range_expr->fetch_value_from(item_add); } - void next_row(Item_sum* item) + void next_row() { /* Ok, our cursor is at the first row R where @@ -900,19 +952,19 @@ public: { if (order_direction * range_expr->cmp_read_only() <= 0) return; - item->remove(); + remove_value_from_items(); } - walk_till_non_peer(item); + walk_till_non_peer(); } private: - void walk_till_non_peer(Item_sum* item) + void walk_till_non_peer() { while (!cursor.get_next()) { if (order_direction * range_expr->cmp_read_only() <= 0) break; - item->remove(); + remove_value_from_items(); } } }; @@ -950,16 +1002,13 @@ class Frame_range_n_bottom: public Frame_cursor */ int order_direction; public: - Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) : - n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg) - {} - - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_range_n_bottom(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list, + bool is_preceding_arg, Item *n_val_arg) : + cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL), + is_preceding(is_preceding_arg) { - cursor.init(thd, info, partition_list); - DBUG_ASSERT(order_list->elements == 1); Item *src_expr= order_list->first->item[0]; @@ -982,7 +1031,12 @@ public: item_add->fix_fields(thd, &item_add); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void init(READ_RECORD *info) + { + cursor.init(info); + } + + void pre_next_partition(ha_rows rownum) { // Save the value of FUNC(current_row) range_expr->fetch_value_from(item_add); @@ -991,20 +1045,20 @@ public: end_of_partition= false; } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { cursor.move_to(rownum); - walk_till_non_peer(item); + walk_till_non_peer(); } - void pre_next_row(Item_sum* item) + void pre_next_row() { if (end_of_partition) return; range_expr->fetch_value_from(item_add); } - void next_row(Item_sum* item) + void next_row() { if (end_of_partition) return; @@ -1017,20 +1071,20 @@ public: { if (order_direction * range_expr->cmp_read_only() < 0) return; - item->add(); + add_value_to_items(); } - walk_till_non_peer(item); + walk_till_non_peer(); } private: - void walk_till_non_peer(Item_sum* item) + void walk_till_non_peer() { int res; while (!(res= cursor.get_next())) { if (order_direction * range_expr->cmp_read_only() < 0) break; - item->add(); + add_value_to_items(); } if (res) end_of_partition= true; @@ -1043,11 +1097,11 @@ private: ... | peer1 | peer2 <----- current_row - | peer3 + | peer3 +-peer4 <----- the cursor points here. peer4 itself is included. nonpeer1 nonpeer2 - + This bound moves in front of the current_row. It should be a the first row that is still a peer of the current row. */ @@ -1060,15 +1114,20 @@ class Frame_range_current_row_bottom: public Frame_cursor bool dont_move; public: - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_range_current_row_bottom(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) : + cursor(thd, partition_list), peer_tracker(thd, order_list) + { + } + + void init(READ_RECORD *info) { - cursor.init(thd, info, partition_list); - peer_tracker.init(thd, order_list); + cursor.init(info); + peer_tracker.init(); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { // Save the value of the current_row peer_tracker.check_if_next_group(); @@ -1076,23 +1135,23 @@ public: if (rownum != 0) { // Add the current row now because our cursor has already seen it - item->add(); + add_value_to_items(); } } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { - walk_till_non_peer(item); + walk_till_non_peer(); } - void pre_next_row(Item_sum* item) + void pre_next_row() { dont_move= !peer_tracker.check_if_next_group(); if (!dont_move) - item->add(); + add_value_to_items(); } - void next_row(Item_sum* item) + void next_row() { // Check if our cursor is pointing at a peer of the current row. // If not, move forward until that becomes true @@ -1104,11 +1163,11 @@ public: */ return; } - walk_till_non_peer(item); + walk_till_non_peer(); } private: - void walk_till_non_peer(Item_sum* item) + void walk_till_non_peer() { /* Walk forward until we've met first row that's not a peer of the current @@ -1118,7 +1177,7 @@ private: { if (peer_tracker.compare_with_cache()) break; - item->add(); + add_value_to_items(); } } }; @@ -1148,33 +1207,38 @@ class Frame_range_current_row_top : public Frame_cursor bool move; public: - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_range_current_row_top(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) : + bound_tracker(thd, partition_list), cursor(), peer_tracker(thd, order_list), + move(false) + {} + + void init(READ_RECORD *info) { - bound_tracker.init(thd, partition_list); + bound_tracker.init(); cursor.init(info); - peer_tracker.init(thd, order_list); + peer_tracker.init(); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { // Fetch the value from the first row peer_tracker.check_if_next_group(); cursor.move_to(rownum+1); } - void next_partition(ha_rows rownum, Item_sum* item) {} + void next_partition(ha_rows rownum) {} - void pre_next_row(Item_sum* item) + void pre_next_row() { // Check if the new current_row is a peer of the row that our cursor is // pointing to. move= peer_tracker.check_if_next_group(); } - void next_row(Item_sum* item) + void next_row() { if (move) { @@ -1187,7 +1251,7 @@ public: // todo: need the following check ? if (!peer_tracker.compare_with_cache()) return; - item->remove(); + remove_value_from_items(); } do @@ -1196,7 +1260,7 @@ public: return; if (!peer_tracker.compare_with_cache()) return; - item->remove(); + remove_value_from_items(); } while (1); } @@ -1214,7 +1278,14 @@ public: class Frame_unbounded_preceding : public Frame_cursor { public: - void next_partition(ha_rows rownum, Item_sum* item) + Frame_unbounded_preceding(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) + {} + + void init(READ_RECORD *info) {} + + void next_partition(ha_rows rownum) { /* UNBOUNDED PRECEDING frame end just stays on the first row. @@ -1222,7 +1293,7 @@ public: */ } - void next_row(Item_sum* item) + void next_row() { /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */ } @@ -1239,18 +1310,22 @@ protected: Partition_read_cursor cursor; public: - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_unbounded_following(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) : + cursor(thd, partition_list) {} + + void init(READ_RECORD *info) { - cursor.init(thd, info, partition_list); + cursor.init(info); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { cursor.on_next_partition(rownum); } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { if (!rownum) { @@ -1258,16 +1333,16 @@ public: if (cursor.get_next()) return; } - item->add(); + add_value_to_items(); /* Walk to the end of the partition, updating the SUM function */ while (!cursor.get_next()) { - item->add(); + add_value_to_items(); } } - void next_row(Item_sum* item) + void next_row() { /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */ } @@ -1277,9 +1352,12 @@ public: class Frame_unbounded_following_set_count : public Frame_unbounded_following { public: - // pre_next_partition is inherited + Frame_unbounded_following_set_count( + THD *thd, + SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) : + Frame_unbounded_following(thd, partition_list, order_list) {} - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { ha_rows num_rows_in_partition= 0; if (!rownum) @@ -1292,13 +1370,16 @@ public: /* Walk to the end of the partition, find how many rows there are. */ while (!cursor.get_next()) - { num_rows_in_partition++; - } - Item_sum_window_with_row_count* item_with_row_count = - static_cast<Item_sum_window_with_row_count *>(item); - item_with_row_count->set_row_count(num_rows_in_partition); + List_iterator_fast<Item_sum> it(sum_functions); + Item_sum* item; + while ((item= it++)) + { + Item_sum_window_with_row_count* item_with_row_count = + static_cast<Item_sum_window_with_row_count *>(item); + item_with_row_count->set_row_count(num_rows_in_partition); + } } }; @@ -1324,13 +1405,12 @@ public: is_top_bound(is_top_bound_arg), n_rows(n_rows_arg) {} - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + void init(READ_RECORD *info) { cursor.init(info); } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { /* Position our cursor to point at the first row in the new partition @@ -1357,12 +1437,12 @@ public: if (n_rows_to_skip == ha_rows(-1)) { cursor.get_next(); - item->add(); + add_value_to_items(); n_rows_to_skip= 0; } } - void next_row(Item_sum* item) + void next_row() { if (n_rows_to_skip) { @@ -1374,9 +1454,9 @@ public: return; // this is not expected to happen. if (is_top_bound) // this is frame start endpoint - item->remove(); + remove_value_from_items(); else - item->add(); + add_value_to_items(); } }; @@ -1391,17 +1471,18 @@ public: class Frame_rows_current_row_bottom : public Frame_cursor { public: - void pre_next_partition(ha_rows rownum, Item_sum* item) + + void pre_next_partition(ha_rows rownum) { - item->add(); + add_value_to_items(); } - void next_partition(ha_rows rownum, Item_sum* item) {} - void pre_next_row(Item_sum* item) + void next_partition(ha_rows rownum) {} + void pre_next_row() { /* Temp table's current row is current_row. Add it to the window func */ - item->add(); + add_value_to_items(); } - void next_row(Item_sum* item) {}; + void next_row() {}; }; @@ -1443,20 +1524,23 @@ class Frame_n_rows_following : public Frame_cursor Partition_read_cursor cursor; bool at_partition_end; public: - Frame_n_rows_following(bool is_top_bound_arg, ha_rows n_rows_arg) : - is_top_bound(is_top_bound_arg), n_rows(n_rows_arg) + Frame_n_rows_following(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list, + bool is_top_bound_arg, ha_rows n_rows_arg) : + is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), + cursor(thd, partition_list) { DBUG_ASSERT(n_rows > 0); } - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + void init(READ_RECORD *info) { - cursor.init(thd, info, partition_list); + cursor.init(info); at_partition_end= false; } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { at_partition_end= false; @@ -1469,39 +1553,39 @@ public: // Current row points at the first row in the partition if (is_top_bound) // this is frame top endpoint - item->remove(); + remove_value_from_items(); else - item->add(); + add_value_to_items(); } } /* Move our cursor to be n_rows ahead. */ - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound; for (ha_rows i= 0; i < i_end; i++) { - if (next_row_intern(item)) + if (next_row_intern()) break; } } - void next_row(Item_sum* item) + void next_row() { if (at_partition_end) return; - next_row_intern(item); + next_row_intern(); } private: - bool next_row_intern(Item_sum *item) + bool next_row_intern() { if (!cursor.get_next()) { if (is_top_bound) // this is frame start endpoint - item->remove(); + remove_value_from_items(); else - item->add(); + add_value_to_items(); } else at_partition_end= true; @@ -1513,8 +1597,9 @@ private: /* Get a Frame_cursor for a frame bound. This is a "factory function". */ -Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) +Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool is_top_bound) { + Window_frame *frame= spec->window_frame; if (!frame) { /* @@ -1536,9 +1621,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) so again the same frame bounds can be used. */ if (is_top_bound) - return new Frame_unbounded_preceding; + return new Frame_unbounded_preceding(thd, + spec->partition_list, + spec->order_list); else - return new Frame_range_current_row_bottom; + return new Frame_range_current_row_bottom(thd, + spec->partition_list, + spec->order_list); } Window_frame_bound *bound= is_top_bound? frame->top_bound : @@ -1554,9 +1643,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) { /* The following serve both RANGE and ROWS: */ if (is_preceding) - return new Frame_unbounded_preceding; - else - return new Frame_unbounded_following; + return new Frame_unbounded_preceding(thd, + spec->partition_list, + spec->order_list); + + return new Frame_unbounded_following(thd, + spec->partition_list, + spec->order_list); } if (frame->units == Window_frame::UNITS_ROWS) @@ -1567,15 +1660,21 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) DBUG_ASSERT((longlong) n_rows >= 0); if (is_preceding) return new Frame_n_rows_preceding(is_top_bound, n_rows); - else - return new Frame_n_rows_following(is_top_bound, n_rows); + + return new Frame_n_rows_following( + thd, spec->partition_list, spec->order_list, + is_top_bound, n_rows); } else { if (is_top_bound) - return new Frame_range_n_top(is_preceding, bound->offset); - else - return new Frame_range_n_bottom(is_preceding, bound->offset); + return new Frame_range_n_top( + thd, spec->partition_list, spec->order_list, + is_preceding, bound->offset); + + return new Frame_range_n_bottom(thd, + spec->partition_list, spec->order_list, + is_preceding, bound->offset); } } @@ -1585,67 +1684,154 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) { if (is_top_bound) return new Frame_rows_current_row_top; - else - return new Frame_rows_current_row_bottom; + + return new Frame_rows_current_row_bottom; } else { if (is_top_bound) - return new Frame_range_current_row_top; - else - return new Frame_range_current_row_bottom; + return new Frame_range_current_row_top( + thd, spec->partition_list, spec->order_list); + + return new Frame_range_current_row_bottom( + thd, spec->partition_list, spec->order_list); } } return NULL; } -void add_extra_frame_cursors(List<Frame_cursor> *cursors, - const Item_sum *window_func) +void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager, + Item_window_func *window_func) { - switch (window_func->sum_func()) + Window_spec *spec= window_func->window_spec; + Item_sum *item_sum= window_func->window_func(); + Frame_cursor *fc; + switch (item_sum->sum_func()) { case Item_sum::CUME_DIST_FUNC: - cursors->push_back(new Frame_unbounded_preceding); - cursors->push_back(new Frame_range_current_row_bottom); + fc= new Frame_unbounded_preceding(thd, + spec->partition_list, + spec->order_list); + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); + fc= new Frame_range_current_row_bottom(thd, + spec->partition_list, + spec->order_list); + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); break; default: - cursors->push_back(new Frame_unbounded_preceding); - cursors->push_back(new Frame_rows_current_row_bottom); + fc= new Frame_unbounded_preceding( + thd, spec->partition_list, spec->order_list); + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); + + fc= new Frame_rows_current_row_bottom; + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); } } -void get_window_func_required_cursors( - List<Frame_cursor> *result, const Item_window_func* item_win) + +/* + Create required frame cursors for the list of window functions. + Register all functions to their appropriate cursors. + If the window functions share the same frame specification, + those window functions will be registered to the same cursor. +*/ +void get_window_functions_required_cursors( + THD *thd, + List<Item_window_func>& window_functions, + List<Cursor_manager> *cursor_managers) { - if (item_win->requires_partition_size()) - result->push_back(new Frame_unbounded_following_set_count); + List_iterator_fast<Item_window_func> it(window_functions); + Item_window_func* item_win_func; + Item_sum *sum_func; + while ((item_win_func= it++)) + { + Cursor_manager *cursor_manager = new Cursor_manager(); + sum_func = item_win_func->window_func(); + Frame_cursor *fc; + /* + Some window functions require the partition size for computing values. + Add a cursor that retrieves it as the first one in the list if necessary. + */ + if (item_win_func->requires_partition_size()) + { + fc= new Frame_unbounded_following_set_count(thd, + item_win_func->window_spec->partition_list, + item_win_func->window_spec->order_list); + fc->add_sum_func(sum_func); + cursor_manager->add_cursor(fc); + } - /* - If it is not a regular window function that follows frame specifications, - specific cursors are required. - */ - if (item_win->is_frame_prohibited()) + /* + If it is not a regular window function that follows frame specifications, + specific cursors are required. ROW_NUM, RANK, NTILE and others follow + such rules. Check is_frame_prohibited check for the full list. + */ + if (item_win_func->is_frame_prohibited()) + { + add_extra_frame_cursors(thd, cursor_manager, item_win_func); + cursor_managers->push_back(cursor_manager); + continue; + } + + Frame_cursor *frame_bottom= get_frame_cursor(thd, + item_win_func->window_spec, false); + Frame_cursor *frame_top= get_frame_cursor(thd, + item_win_func->window_spec, true); + + frame_bottom->add_sum_func(sum_func); + frame_top->add_sum_func(sum_func); + + /* + The order of these cursors is important. A sum function + must first add values (via frame_bottom) then remove them via + frame_top. Removing items first doesn't make sense in the case of all + window functions. + */ + cursor_manager->add_cursor(frame_bottom); + cursor_manager->add_cursor(frame_top); + cursor_managers->push_back(cursor_manager); + } +} + +/** + Helper function that takes a list of window functions and writes + their values in the current table record. +*/ +static +bool save_window_function_values(List<Item_window_func>& window_functions, + TABLE *tbl, uchar *rowid_buf) +{ + List_iterator_fast<Item_window_func> iter(window_functions); + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); + store_record(tbl, record[1]); + while (Item_window_func *item_win= iter++) { - add_extra_frame_cursors(result, item_win->window_func()); - return; + int err; + item_win->save_in_field(item_win->result_field, true); + // TODO check if this can be placed outside the loop. + err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); + if (err && err != HA_ERR_RECORD_IS_THE_SAME) + return true; } - /* A regular window function follows the frame specification. */ - result->push_back(get_frame_cursor(item_win->window_spec->window_frame, - false)); - result->push_back(get_frame_cursor(item_win->window_spec->window_frame, - true)); + return false; } /* + TODO(cvicentiu) update this comment to reflect the new execution. + Streamed window function computation with window frames. We make a single pass over the ordered temp.table, but we're using three - cursors: + cursors: - current row - the row that we're computing window func value for) - start_bound - the start of the frame - bottom_bound - the end of the frame - + All three cursors move together. @todo @@ -1655,7 +1841,7 @@ void get_window_func_required_cursors( @detail ROWS BETWEEN 3 PRECEDING -- frame start AND 3 FOLLOWING -- frame end - + /------ frame end (aka BOTTOM) Dataset start | --------====*=======[*]========*========-------->> dataset end @@ -1663,7 +1849,7 @@ void get_window_func_required_cursors( | +-------- current row | \-------- frame start ("TOP") - + - frame_end moves forward and adds rows into the aggregate function. - frame_start follows behind and removes rows from the aggregate function. - current_row is the row where the value of aggregate function is stored. @@ -1672,97 +1858,90 @@ void get_window_func_required_cursors( condition (Others can catch up by counting rows?) */ - -bool compute_window_func_with_frames(Item_window_func *item_win, - TABLE *tbl, READ_RECORD *info) +bool compute_window_func(THD *thd, + List<Item_window_func>& window_functions, + List<Cursor_manager>& cursor_managers, + TABLE *tbl, + SORT_INFO *filesort_result) { - THD *thd= tbl->in_use; - int err= 0; + List_iterator_fast<Item_window_func> iter_win_funcs(window_functions); + List_iterator_fast<Cursor_manager> iter_cursor_managers(cursor_managers); + uint err; - Item_sum *sum_func= item_win->window_func(); - /* This algorithm doesn't support DISTINCT aggregator */ - sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR); + READ_RECORD info; - List<Frame_cursor> cursors; - get_window_func_required_cursors(&cursors, item_win); + if (init_read_record(&info, current_thd, tbl, NULL/*select*/, filesort_result, + 0, 1, FALSE)) + return true; - List_iterator_fast<Frame_cursor> it(cursors); - Frame_cursor *c; - while((c= it++)) + Cursor_manager *cursor_manager; + while ((cursor_manager= iter_cursor_managers++)) + cursor_manager->initialize_cursors(&info); + + /* One partition tracker for each window function. */ + List<Group_bound_tracker> partition_trackers; + Item_window_func *win_func; + while ((win_func= iter_win_funcs++)) { - c->init(thd, info, item_win->window_spec->partition_list, - item_win->window_spec->order_list); + Group_bound_tracker *tracker= new Group_bound_tracker(thd, + win_func->window_spec->partition_list); + // TODO(cvicentiu) This should be removed and placed in constructor. + tracker->init(); + partition_trackers.push_back(tracker); } - bool is_error= false; + List_iterator_fast<Group_bound_tracker> iter_part_trackers(partition_trackers); ha_rows rownum= 0; uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0)); while (true) { - /* Move the current_row */ - if ((err=info->read_record(info))) - { - break; /* End of file */ - } - bool partition_changed= item_win->check_if_partition_changed(); + if ((err= info.read_record(&info))) + break; // End of file. + /* Remember current row so that we can restore it before computing + each window function. */ tbl->file->position(tbl->record[0]); memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length); - if (partition_changed || (rownum == 0)) - { - sum_func->clear(); - /* - pre_XXX functions assume that tbl->record[0] contains current_row, and - they may not change it. - */ - it.rewind(); - while ((c= it++)) - c->pre_next_partition(rownum, sum_func); - /* - We move bottom_bound first, because we want rows to be added into the - aggregate before top_bound attempts to remove them. - */ - it.rewind(); - while ((c= it++)) - c->next_partition(rownum, sum_func); - } - else - { - /* Again, both pre_XXX function can find current_row in tbl->record[0] */ - it.rewind(); - while ((c= it++)) - c->pre_next_row(sum_func); - - /* These make no assumptions about tbl->record[0] and may change it */ - it.rewind(); - while ((c= it++)) - c->next_row(sum_func); - } - rownum++; + iter_win_funcs.rewind(); + iter_part_trackers.rewind(); + iter_cursor_managers.rewind(); - /* - Frame cursors may have made tbl->record[0] to point to some record other - than current_row. This applies to tbl->file's internal state, too. - Fix this by reading the current row again. - */ - tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); - store_record(tbl,record[1]); - item_win->save_in_field(item_win->result_field, true); - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); - if (err && err != HA_ERR_RECORD_IS_THE_SAME) + Group_bound_tracker *tracker; + while ((win_func= iter_win_funcs++) && + (tracker= iter_part_trackers++) && + (cursor_manager= iter_cursor_managers++)) { - is_error= true; - break; + if (tracker->check_if_next_group() || (rownum == 0)) + { + /* TODO(cvicentiu) + Clearing window functions should happen through cursors. */ + win_func->window_func()->clear(); + cursor_manager->notify_cursors_partition_changed(rownum); + } + else + { + cursor_manager->notify_cursors_next_row(); + } + /* Return to current row after notifying cursors for each window + function. */ + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); } + + /* We now have computed values for each window function. They can now + be saved in the current row. */ + save_window_function_values(window_functions, tbl, rowid_buf); + + rownum++; } my_free(rowid_buf); - cursors.delete_elements(); - return is_error? true: false; -} + partition_trackers.delete_elements(); + end_read_record(&info); + return false; +} /* Make a list that is a concation of two lists of ORDER elements */ @@ -1799,25 +1978,18 @@ static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2) return res; } - -bool Window_func_runner::setup(THD *thd) +bool Window_func_runner::add_function_to_run(Item_window_func *win_func) { - win_func->setup_partition_border_check(thd); + + Item_sum *sum_func= win_func->window_func(); + sum_func->setup_window_func(current_thd, win_func->window_spec); Item_sum::Sumfunctype type= win_func->window_func()->sum_func(); - switch (type) + switch (type) { case Item_sum::ROW_NUMBER_FUNC: case Item_sum::RANK_FUNC: case Item_sum::DENSE_RANK_FUNC: - { - /* - One-pass window function computation, walk through the rows and - assign values. - */ - compute_func= compute_window_func_values; - break; - } case Item_sum::COUNT_FUNC: case Item_sum::SUM_BIT_FUNC: case Item_sum::SUM_FUNC: @@ -1825,43 +1997,49 @@ bool Window_func_runner::setup(THD *thd) case Item_sum::PERCENT_RANK_FUNC: case Item_sum::CUME_DIST_FUNC: case Item_sum::NTILE_FUNC: - { - /* - Frame-aware window function computation. It does one pass, but - uses three cursors -frame_start, current_row, and frame_end. - */ - compute_func= compute_window_func_with_frames; break; - } + default: - my_error(ER_NOT_SUPPORTED_YET, MYF(0), "This aggregate as window function"); + my_error(ER_NOT_SUPPORTED_YET, MYF(0), + "This aggregate as window function"); return true; } - return false; + return window_functions.push_back(win_func); } /* Compute the value of window function for all rows. */ -bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result) +bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result) { - THD *thd= current_thd; - win_func->set_phase_to_computation(); - - /* Go through the sorted array and compute the window function */ - READ_RECORD info; - - if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result, - 0, 1, FALSE)) - return true; + List_iterator_fast<Item_window_func> it(window_functions); + Item_window_func *win_func; + while ((win_func= it++)) + { + win_func->set_phase_to_computation(); + // TODO(cvicentiu) Setting the aggregator should probably be done during + // setup of Window_funcs_sort. + win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR); + } + it.rewind(); - bool is_error= compute_func(win_func, tbl, &info); + List<Cursor_manager> cursor_managers; + get_window_functions_required_cursors(thd, window_functions, + &cursor_managers); - win_func->set_phase_to_retrieval(); + /* Go through the sorted array and compute the window function */ + bool is_error= compute_window_func(thd, + window_functions, + cursor_managers, + tbl, filesort_result); + while ((win_func= it++)) + { + win_func->set_phase_to_retrieval(); + } - end_read_record(&info); + cursor_managers.delete_elements(); return is_error; } @@ -1872,21 +2050,15 @@ bool Window_funcs_sort::exec(JOIN *join) THD *thd= join->thd; JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count]; + /* Sort the table based on the most specific sorting criteria of + the window functions. */ if (create_sort_index(thd, join, join_tab, filesort)) return true; TABLE *tbl= join_tab->table; SORT_INFO *filesort_result= join_tab->filesort_result; - bool is_error= false; - List_iterator<Window_func_runner> it(runners); - Window_func_runner *runner; - - while ((runner= it++)) - { - if ((is_error= runner->exec(tbl, filesort_result))) - break; - } + bool is_error= runner.exec(thd, tbl, filesort_result); delete join_tab->filesort_result; join_tab->filesort_result= NULL; @@ -1894,30 +2066,32 @@ bool Window_funcs_sort::exec(JOIN *join) } -bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, - List_iterator<Item_window_func> &it) +bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, + List_iterator<Item_window_func> &it) { Item_window_func *win_func= it.peek(); Item_window_func *prev_win_func; + /* The iterator should point to a valid function at the start of execution. */ + DBUG_ASSERT(win_func); do { - Window_func_runner *runner; - if (!(runner= new Window_func_runner(win_func)) || - runner->setup(thd)) - { + if (runner.add_function_to_run(win_func)) return true; - } - runners.push_back(runner); it++; prev_win_func= win_func; - } while ((win_func= it.peek()) && !(win_func->marker & SORTORDER_CHANGE_FLAG)); - + } while ((win_func= it.peek()) && + !(win_func->marker & SORTORDER_CHANGE_FLAG)); + /* The sort criteria must be taken from the last win_func in the group of - adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. + adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is + because the sort order must be the most specific sorting criteria defined + within the window function group. This ensures that we sort the table + in a way that the result is valid for all window functions belonging to + this Window_funcs_sort. */ - Window_spec *spec = prev_win_func->window_spec; + Window_spec *spec= prev_win_func->window_spec; ORDER* sort_order= concat_order_lists(thd->mem_root, spec->partition_list->first, @@ -1932,8 +2106,8 @@ bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, bool Window_funcs_computation::setup(THD *thd, - List<Item_window_func> *window_funcs, - JOIN_TAB *tab) + List<Item_window_func> *window_funcs, + JOIN_TAB *tab) { order_window_funcs_by_window_specs(window_funcs); |