diff options
-rw-r--r-- | sql/item_windowfunc.cc | 43 | ||||
-rw-r--r-- | sql/item_windowfunc.h | 90 | ||||
-rw-r--r-- | sql/sql_window.cc | 884 | ||||
-rw-r--r-- | sql/sql_window.h | 37 |
4 files changed, 608 insertions, 446 deletions
diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc index d157d545dad..c8ea979900c 100644 --- a/sql/item_windowfunc.cc +++ b/sql/item_windowfunc.cc @@ -41,7 +41,7 @@ Item_window_func::resolve_window_name(THD *thd) return true; } - return false; + return false; } @@ -154,7 +154,7 @@ void Item_window_func::split_sum_func(THD *thd, Ref_ptr_array ref_pointer_array, /* - This must be called before advance_window() can be called. + This must be called before attempting to compute the window function values. @detail If we attempt to do it in fix_fields(), partition_fields will refer @@ -162,30 +162,25 @@ void Item_window_func::split_sum_func(THD *thd, Ref_ptr_array ref_pointer_array, We need it to refer to temp.table columns. */ -void Item_window_func::setup_partition_border_check(THD *thd) -{ - partition_tracker.init(thd, window_spec->partition_list); - window_func()->setup_window_func(thd, window_spec); -} - - void Item_sum_rank::setup_window_func(THD *thd, Window_spec *window_spec) { /* TODO: move this into Item_window_func? */ - peer_tracker.init(thd, window_spec->order_list); + peer_tracker = new Group_bound_tracker(thd, window_spec->order_list); + peer_tracker->init(); clear(); } void Item_sum_dense_rank::setup_window_func(THD *thd, Window_spec *window_spec) { /* TODO: consider moving this && Item_sum_rank's implementation */ - peer_tracker.init(thd, window_spec->order_list); + peer_tracker = new Group_bound_tracker(thd, window_spec->order_list); + peer_tracker->init(); clear(); } bool Item_sum_dense_rank::add() { - if (peer_tracker.check_if_next_group() || first_add) + if (peer_tracker->check_if_next_group() || first_add) { first_add= false; dense_rank++; @@ -198,7 +193,7 @@ bool Item_sum_dense_rank::add() bool Item_sum_rank::add() { row_number++; - if (peer_tracker.check_if_next_group()) + if (peer_tracker->check_if_next_group()) { /* Row value changed */ cur_rank= row_number; @@ -206,25 +201,10 @@ bool Item_sum_rank::add() return false; } -bool Item_window_func::check_if_partition_changed() -{ - return partition_tracker.check_if_next_group(); -} - -void Item_window_func::advance_window() -{ - if (check_if_partition_changed()) - { - /* Next partition */ - window_func()->clear(); - } - window_func()->add(); -} - bool Item_sum_percent_rank::add() { row_number++; - if (peer_tracker.check_if_next_group()) + if (peer_tracker->check_if_next_group()) { /* Row value changed. */ cur_rank= row_number; @@ -235,8 +215,7 @@ bool Item_sum_percent_rank::add() void Item_sum_percent_rank::setup_window_func(THD *thd, Window_spec *window_spec) { /* TODO: move this into Item_window_func? */ - peer_tracker.init(thd, window_spec->order_list); + peer_tracker = new Group_bound_tracker(thd, window_spec->order_list); + peer_tracker->init(); clear(); } - - diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h index 163cc855fe2..bb2256207ec 100644 --- a/sql/item_windowfunc.h +++ b/sql/item_windowfunc.h @@ -12,25 +12,19 @@ int test_if_group_changed(List<Cached_item> &list); /* A wrapper around test_if_group_changed */ class Group_bound_tracker { - List<Cached_item> group_fields; - /* - During the first check_if_next_group, the list of cached_items is not - initialized. The compare function will return that the items match if - the field's value is the same as the Cached_item's default value (0). - This flag makes sure that we always return true during the first check. - - XXX This is better to be implemented within test_if_group_changed, but - since it is used in other parts of the codebase, we keep it here for now. - */ - bool first_check; public: - void init(THD *thd, SQL_I_List<ORDER> *list) + + Group_bound_tracker(THD *thd, SQL_I_List<ORDER> *list) { for (ORDER *curr = list->first; curr; curr=curr->next) { Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE); group_fields.push_back(tmp); } + } + + void init() + { first_check= true; } @@ -76,6 +70,19 @@ public: } return 0; } + +private: + List<Cached_item> group_fields; + /* + During the first check_if_next_group, the list of cached_items is not + initialized. The compare function will return that the items match if + the field's value is the same as the Cached_item's default value (0). + This flag makes sure that we always return true during the first check. + + XXX This is better to be implemented within test_if_group_changed, but + since it is used in other parts of the codebase, we keep it here for now. + */ + bool first_check; }; /* @@ -92,19 +99,22 @@ class Item_sum_row_number: public Item_sum_int longlong count; public: + + Item_sum_row_number(THD *thd) + : Item_sum_int(thd), count(0) {} + void clear() { count= 0; } - bool add() + + bool add() { count++; - return false; + return false; } - void update_field() {} - Item_sum_row_number(THD *thd) - : Item_sum_int(thd), count(0) {} + void update_field() {} enum Sumfunctype sum_func() const { @@ -119,6 +129,7 @@ public: { return "row_number("; } + Item *get_copy(THD *thd, MEM_ROOT *mem_root) { return get_item_copy<Item_sum_row_number>(thd, mem_root, this); } }; @@ -146,9 +157,12 @@ class Item_sum_rank: public Item_sum_int protected: longlong row_number; // just ROW_NUMBER() longlong cur_rank; // current value - - Group_bound_tracker peer_tracker; + + Group_bound_tracker *peer_tracker; public: + + Item_sum_rank(THD *thd) : Item_sum_int(thd), peer_tracker(NULL) {} + void clear() { /* This is called on partition start */ @@ -169,10 +183,6 @@ public: TODO: ^^ what does this do ? It is not called ever? */ -public: - Item_sum_rank(THD *thd) - : Item_sum_int(thd) {} - enum Sumfunctype sum_func () const { return RANK_FUNC; @@ -184,9 +194,12 @@ public: } void setup_window_func(THD *thd, Window_spec *window_spec); + void cleanup() { - peer_tracker.cleanup(); + if (peer_tracker) + peer_tracker->cleanup(); + delete peer_tracker; Item_sum_int::cleanup(); } Item *get_copy(THD *thd, MEM_ROOT *mem_root) @@ -217,7 +230,7 @@ class Item_sum_dense_rank: public Item_sum_int { longlong dense_rank; bool first_add; - Group_bound_tracker peer_tracker; + Group_bound_tracker *peer_tracker; public: /* XXX(cvicentiu) This class could potentially be implemented in the rank @@ -236,7 +249,7 @@ class Item_sum_dense_rank: public Item_sum_int } Item_sum_dense_rank(THD *thd) - : Item_sum_int(thd), dense_rank(0), first_add(true) {} + : Item_sum_int(thd), dense_rank(0), first_add(true), peer_tracker(NULL) {} enum Sumfunctype sum_func () const { return DENSE_RANK_FUNC; @@ -251,7 +264,11 @@ class Item_sum_dense_rank: public Item_sum_int void cleanup() { - peer_tracker.cleanup(); + if (peer_tracker) + { + peer_tracker->cleanup(); + delete peer_tracker; + } Item_sum_int::cleanup(); } Item *get_copy(THD *thd, MEM_ROOT *mem_root) @@ -294,7 +311,7 @@ class Item_sum_percent_rank: public Item_sum_window_with_row_count { public: Item_sum_percent_rank(THD *thd) - : Item_sum_window_with_row_count(thd), cur_rank(1) {} + : Item_sum_window_with_row_count(thd), cur_rank(1), peer_tracker(NULL) {} longlong val_int() { @@ -354,11 +371,15 @@ class Item_sum_percent_rank: public Item_sum_window_with_row_count longlong cur_rank; // Current rank of the current row. longlong row_number; // Value if this were ROW_NUMBER() function. - Group_bound_tracker peer_tracker; + Group_bound_tracker *peer_tracker; void cleanup() { - peer_tracker.cleanup(); + if (peer_tracker) + { + peer_tracker->cleanup(); + delete peer_tracker; + } Item_sum_num::cleanup(); } }; @@ -515,12 +536,6 @@ public: public: Window_spec *window_spec; - /* - This stores the data about the partition we're currently in. - advance_window() uses this to tell when we've left one partition and - entered another - */ - Group_bound_tracker partition_tracker; public: Item_window_func(THD *thd, Item_sum *win_func, LEX_STRING *win_name) : Item_func_or_sum(thd, (Item *) win_func), @@ -613,9 +628,6 @@ public: */ void setup_partition_border_check(THD *thd); - void advance_window(); - bool check_if_partition_changed(); - enum_field_types field_type() const { return ((Item_sum *) args[0])->field_type(); diff --git a/sql/sql_window.cc b/sql/sql_window.cc index 4c5ef53d854..a862821bd56 100644 --- a/sql/sql_window.cc +++ b/sql/sql_window.cc @@ -37,12 +37,12 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it) if (win_spec->order_list->elements && order_list->elements) { my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0), ref_name); - return true; + return true; } - if (win_spec->window_frame) + if (win_spec->window_frame) { my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0), ref_name); - return true; + return true; } referenced_win_spec= win_spec; if (partition_list->elements == 0) @@ -54,7 +54,7 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it) if (ref_name && !referenced_win_spec) { my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name); - return true; + return true; } window_names_are_checked= true; return false; @@ -73,7 +73,7 @@ Window_frame::check_frame_bounds() top_bound->precedence_type == Window_frame_bound::FOLLOWING)) { my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0)); - return true; + return true; } return false; @@ -86,7 +86,7 @@ Window_frame::check_frame_bounds() int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, - List<Item> &fields, List<Item> &all_fields, + List<Item> &fields, List<Item> &all_fields, List<Window_spec> &win_specs, List<Item_window_func> &win_funcs) { Window_spec *win_spec; @@ -116,7 +116,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, it.rewind(); List_iterator_fast<Window_spec> itp(win_specs); - + while ((win_spec= it++)) { bool hidden_group_fields; @@ -131,7 +131,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, { DBUG_RETURN(1); } - + if (win_spec->window_frame && win_spec->window_frame->exclusion != Window_frame::EXCL_NONE) { @@ -188,7 +188,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, } } } - + /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */ if (win_spec->window_frame && win_spec->window_frame->units == Window_frame::UNITS_ROWS) @@ -219,7 +219,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, { win_func_item->update_used_tables(); } - + DBUG_RETURN(0); } @@ -445,7 +445,7 @@ typedef int (*Item_window_func_cmp)(Item_window_func *f1, @brief Sort window functions so that those that can be computed together are adjacent. - + @detail Sort window functions by their - required sorting order, @@ -498,48 +498,15 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list) } else if (win_spec_prev->window_frame != win_spec_curr->window_frame) curr->marker|= FRAME_CHANGE_FLAG; - - prev= curr; - } + + prev= curr; + } } ///////////////////////////////////////////////////////////////////////////// -/* - Do a pass over sorted table and compute window function values. - - This function is for handling window functions that can be computed on the - fly. Examples are RANK() and ROW_NUMBER(). -*/ -bool compute_window_func_values(Item_window_func *item_win, - TABLE *tbl, READ_RECORD *info) -{ - int err; - while (!(err=info->read_record(info))) - { - store_record(tbl,record[1]); - - /* - This will cause window function to compute its value for the - current row : - */ - item_win->advance_window(); - - /* - Put the new value into temptable's field - TODO: Should this use item_win->update_field() call? - Regular aggegate function implementations seem to implement it. - */ - item_win->save_in_field(item_win->result_field, true); - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); - if (err && err != HA_ERR_RECORD_IS_THE_SAME) - return true; - } - return false; -} - ///////////////////////////////////////////////////////////////////////////// // Window Frames support ///////////////////////////////////////////////////////////////////////////// @@ -571,11 +538,6 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst) class Rowid_seq_cursor { - uchar *cache_start; - uchar *cache_pos; - uchar *cache_end; - uint ref_length; - public: virtual ~Rowid_seq_cursor() {} @@ -595,17 +557,17 @@ public: cache_pos+= ref_length; return 0; } - + ha_rows get_rownum() { return (cache_pos - cache_start) / ref_length; } - // will be called by ROWS n FOLLOWING to catch up. void move_to(ha_rows row_number) { cache_pos= cache_start + row_number * ref_length; } + protected: bool at_eof() { return (cache_pos == cache_end); } @@ -618,6 +580,12 @@ protected: } uchar *get_curr_rowid() { return cache_pos; } + +private: + uchar *cache_start; + uchar *cache_pos; + uchar *cache_end; + uint ref_length; }; @@ -627,11 +595,6 @@ protected: class Table_read_cursor : public Rowid_seq_cursor { - /* - Note: we don't own *read_record, somebody else is using it. - We only look at the constant part of it, e.g. table, record buffer, etc. - */ - READ_RECORD *read_record; public: virtual ~Table_read_cursor() {} @@ -668,7 +631,14 @@ public: return false; // didn't restore } - // todo: should move_to() also read row here? +private: + /* + Note: we don't own *read_record, somebody else is using it. + We only look at the constant part of it, e.g. table, record buffer, etc. + */ + READ_RECORD *read_record; + + // TODO(spetrunia): should move_to() also read row here? }; @@ -679,14 +649,14 @@ public: class Partition_read_cursor { - Table_read_cursor tbl_cursor; - Group_bound_tracker bound_tracker; - bool end_of_partition; public: - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list) + Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) : + bound_tracker(thd, partition_list) {} + + void init(READ_RECORD *info) { tbl_cursor.init(info); - bound_tracker.init(thd, partition_list); + bound_tracker.init(); end_of_partition= false; } @@ -704,7 +674,7 @@ public: } /* - Moves to a new row. The row is assumed to be within the current partition + Moves to a new row. The row is assumed to be within the current partition. */ void move_to(ha_rows rownum) { tbl_cursor.move_to(rownum); } @@ -731,14 +701,18 @@ public: { return tbl_cursor.restore_last_row(); } + +private: + Table_read_cursor tbl_cursor; + Group_bound_tracker bound_tracker; + bool end_of_partition; }; ///////////////////////////////////////////////////////////////////////////// - /* Window frame bound cursor. Abstract interface. - + @detail The cursor moves within the partition that the current row is in. It may be ahead or behind the current row. @@ -775,11 +749,12 @@ public: class Frame_cursor : public Sql_alloc { public: - virtual void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) - {} + virtual void init(READ_RECORD *info) {}; + bool add_sum_func(Item_sum* item) + { + return sum_functions.push_back(item); + } /* Current row has moved to the next partition and is positioned on the first row there. Position the frame bound accordingly. @@ -796,20 +771,95 @@ public: - The callee may move tbl->file and tbl->record[0] to point to some other row. */ - virtual void pre_next_partition(ha_rows rownum, Item_sum* item){}; - virtual void next_partition(ha_rows rownum, Item_sum* item)=0; - + virtual void pre_next_partition(ha_rows rownum) {}; + virtual void next_partition(ha_rows rownum)=0; + /* The current row has moved one row forward. Move this frame bound accordingly, and update the value of aggregate function as necessary. */ - virtual void pre_next_row(Item_sum* item){}; - virtual void next_row(Item_sum* item)=0; - - virtual ~Frame_cursor(){} + virtual void pre_next_row() {}; + virtual void next_row()=0; + + virtual ~Frame_cursor() {} + +protected: + inline void add_value_to_items() + { + List_iterator_fast<Item_sum> it(sum_functions); + Item_sum *item_sum; + while ((item_sum= it++)) + { + item_sum->add(); + } + } + inline void remove_value_from_items() + { + List_iterator_fast<Item_sum> it(sum_functions); + Item_sum *item_sum; + while ((item_sum= it++)) + { + item_sum->remove(); + } + } + + /* Sum functions that this cursor handles. */ + List<Item_sum> sum_functions; +}; + +/* + A class that owns cursor objects associated with a specific window function. +*/ +class Cursor_manager +{ +public: + bool add_cursor(Frame_cursor *cursor) + { + return cursors.push_back(cursor); + } + + void initialize_cursors(READ_RECORD *info) + { + List_iterator_fast<Frame_cursor> iter(cursors); + Frame_cursor *fc; + while ((fc= iter++)) + fc->init(info); + } + + void notify_cursors_partition_changed(ha_rows rownum) + { + List_iterator_fast<Frame_cursor> iter(cursors); + Frame_cursor *cursor; + while ((cursor= iter++)) + cursor->pre_next_partition(rownum); + + iter.rewind(); + while ((cursor= iter++)) + cursor->next_partition(rownum); + } + + void notify_cursors_next_row() + { + List_iterator_fast<Frame_cursor> iter(cursors); + Frame_cursor *cursor; + while ((cursor= iter++)) + cursor->pre_next_row(); + + iter.rewind(); + while ((cursor= iter++)) + cursor->next_row(); + } + + ~Cursor_manager() { cursors.delete_elements(); } + +private: + /* List of the cursors that this manager owns. */ + List<Frame_cursor> cursors; }; + + ////////////////////////////////////////////////////////////////////////////// // RANGE-type frames ////////////////////////////////////////////////////////////////////////////// @@ -841,16 +891,12 @@ class Frame_range_n_top : public Frame_cursor */ int order_direction; public: - Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) : + Frame_range_n_top(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list, + bool is_preceding_arg, Item *n_val_arg) : n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg) - {} - - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) { - cursor.init(info); - DBUG_ASSERT(order_list->elements == 1); Item *src_expr= order_list->first->item[0]; if (order_list->first->direction == ORDER::ORDER_ASC) @@ -872,24 +918,30 @@ public: item_add->fix_fields(thd, &item_add); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void init(READ_RECORD *info) + { + cursor.init(info); + + } + + void pre_next_partition(ha_rows rownum) { // Save the value of FUNC(current_row) range_expr->fetch_value_from(item_add); } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { cursor.move_to(rownum); - walk_till_non_peer(item); + walk_till_non_peer(); } - void pre_next_row(Item_sum* item) + void pre_next_row() { range_expr->fetch_value_from(item_add); } - void next_row(Item_sum* item) + void next_row() { /* Ok, our cursor is at the first row R where @@ -900,19 +952,19 @@ public: { if (order_direction * range_expr->cmp_read_only() <= 0) return; - item->remove(); + remove_value_from_items(); } - walk_till_non_peer(item); + walk_till_non_peer(); } private: - void walk_till_non_peer(Item_sum* item) + void walk_till_non_peer() { while (!cursor.get_next()) { if (order_direction * range_expr->cmp_read_only() <= 0) break; - item->remove(); + remove_value_from_items(); } } }; @@ -950,16 +1002,13 @@ class Frame_range_n_bottom: public Frame_cursor */ int order_direction; public: - Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) : - n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg) - {} - - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_range_n_bottom(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list, + bool is_preceding_arg, Item *n_val_arg) : + cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL), + is_preceding(is_preceding_arg) { - cursor.init(thd, info, partition_list); - DBUG_ASSERT(order_list->elements == 1); Item *src_expr= order_list->first->item[0]; @@ -982,7 +1031,12 @@ public: item_add->fix_fields(thd, &item_add); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void init(READ_RECORD *info) + { + cursor.init(info); + } + + void pre_next_partition(ha_rows rownum) { // Save the value of FUNC(current_row) range_expr->fetch_value_from(item_add); @@ -991,20 +1045,20 @@ public: end_of_partition= false; } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { cursor.move_to(rownum); - walk_till_non_peer(item); + walk_till_non_peer(); } - void pre_next_row(Item_sum* item) + void pre_next_row() { if (end_of_partition) return; range_expr->fetch_value_from(item_add); } - void next_row(Item_sum* item) + void next_row() { if (end_of_partition) return; @@ -1017,20 +1071,20 @@ public: { if (order_direction * range_expr->cmp_read_only() < 0) return; - item->add(); + add_value_to_items(); } - walk_till_non_peer(item); + walk_till_non_peer(); } private: - void walk_till_non_peer(Item_sum* item) + void walk_till_non_peer() { int res; while (!(res= cursor.get_next())) { if (order_direction * range_expr->cmp_read_only() < 0) break; - item->add(); + add_value_to_items(); } if (res) end_of_partition= true; @@ -1043,11 +1097,11 @@ private: ... | peer1 | peer2 <----- current_row - | peer3 + | peer3 +-peer4 <----- the cursor points here. peer4 itself is included. nonpeer1 nonpeer2 - + This bound moves in front of the current_row. It should be a the first row that is still a peer of the current row. */ @@ -1060,15 +1114,20 @@ class Frame_range_current_row_bottom: public Frame_cursor bool dont_move; public: - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_range_current_row_bottom(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) : + cursor(thd, partition_list), peer_tracker(thd, order_list) + { + } + + void init(READ_RECORD *info) { - cursor.init(thd, info, partition_list); - peer_tracker.init(thd, order_list); + cursor.init(info); + peer_tracker.init(); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { // Save the value of the current_row peer_tracker.check_if_next_group(); @@ -1076,23 +1135,23 @@ public: if (rownum != 0) { // Add the current row now because our cursor has already seen it - item->add(); + add_value_to_items(); } } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { - walk_till_non_peer(item); + walk_till_non_peer(); } - void pre_next_row(Item_sum* item) + void pre_next_row() { dont_move= !peer_tracker.check_if_next_group(); if (!dont_move) - item->add(); + add_value_to_items(); } - void next_row(Item_sum* item) + void next_row() { // Check if our cursor is pointing at a peer of the current row. // If not, move forward until that becomes true @@ -1104,11 +1163,11 @@ public: */ return; } - walk_till_non_peer(item); + walk_till_non_peer(); } private: - void walk_till_non_peer(Item_sum* item) + void walk_till_non_peer() { /* Walk forward until we've met first row that's not a peer of the current @@ -1118,7 +1177,7 @@ private: { if (peer_tracker.compare_with_cache()) break; - item->add(); + add_value_to_items(); } } }; @@ -1148,33 +1207,38 @@ class Frame_range_current_row_top : public Frame_cursor bool move; public: - void init(THD *thd, READ_RECORD *info, - SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_range_current_row_top(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) : + bound_tracker(thd, partition_list), cursor(), peer_tracker(thd, order_list), + move(false) + {} + + void init(READ_RECORD *info) { - bound_tracker.init(thd, partition_list); + bound_tracker.init(); cursor.init(info); - peer_tracker.init(thd, order_list); + peer_tracker.init(); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { // Fetch the value from the first row peer_tracker.check_if_next_group(); cursor.move_to(rownum+1); } - void next_partition(ha_rows rownum, Item_sum* item) {} + void next_partition(ha_rows rownum) {} - void pre_next_row(Item_sum* item) + void pre_next_row() { // Check if the new current_row is a peer of the row that our cursor is // pointing to. move= peer_tracker.check_if_next_group(); } - void next_row(Item_sum* item) + void next_row() { if (move) { @@ -1187,7 +1251,7 @@ public: // todo: need the following check ? if (!peer_tracker.compare_with_cache()) return; - item->remove(); + remove_value_from_items(); } do @@ -1196,7 +1260,7 @@ public: return; if (!peer_tracker.compare_with_cache()) return; - item->remove(); + remove_value_from_items(); } while (1); } @@ -1214,7 +1278,14 @@ public: class Frame_unbounded_preceding : public Frame_cursor { public: - void next_partition(ha_rows rownum, Item_sum* item) + Frame_unbounded_preceding(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) + {} + + void init(READ_RECORD *info) {} + + void next_partition(ha_rows rownum) { /* UNBOUNDED PRECEDING frame end just stays on the first row. @@ -1222,7 +1293,7 @@ public: */ } - void next_row(Item_sum* item) + void next_row() { /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */ } @@ -1239,18 +1310,22 @@ protected: Partition_read_cursor cursor; public: - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + Frame_unbounded_following(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list) : + cursor(thd, partition_list) {} + + void init(READ_RECORD *info) { - cursor.init(thd, info, partition_list); + cursor.init(info); } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { cursor.on_next_partition(rownum); } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { if (!rownum) { @@ -1258,16 +1333,16 @@ public: if (cursor.get_next()) return; } - item->add(); + add_value_to_items(); /* Walk to the end of the partition, updating the SUM function */ while (!cursor.get_next()) { - item->add(); + add_value_to_items(); } } - void next_row(Item_sum* item) + void next_row() { /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */ } @@ -1277,9 +1352,12 @@ public: class Frame_unbounded_following_set_count : public Frame_unbounded_following { public: - // pre_next_partition is inherited + Frame_unbounded_following_set_count( + THD *thd, + SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) : + Frame_unbounded_following(thd, partition_list, order_list) {} - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { ha_rows num_rows_in_partition= 0; if (!rownum) @@ -1292,13 +1370,16 @@ public: /* Walk to the end of the partition, find how many rows there are. */ while (!cursor.get_next()) - { num_rows_in_partition++; - } - Item_sum_window_with_row_count* item_with_row_count = - static_cast<Item_sum_window_with_row_count *>(item); - item_with_row_count->set_row_count(num_rows_in_partition); + List_iterator_fast<Item_sum> it(sum_functions); + Item_sum* item; + while ((item= it++)) + { + Item_sum_window_with_row_count* item_with_row_count = + static_cast<Item_sum_window_with_row_count *>(item); + item_with_row_count->set_row_count(num_rows_in_partition); + } } }; @@ -1324,13 +1405,12 @@ public: is_top_bound(is_top_bound_arg), n_rows(n_rows_arg) {} - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + void init(READ_RECORD *info) { cursor.init(info); } - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { /* Position our cursor to point at the first row in the new partition @@ -1357,12 +1437,12 @@ public: if (n_rows_to_skip == ha_rows(-1)) { cursor.get_next(); - item->add(); + add_value_to_items(); n_rows_to_skip= 0; } } - void next_row(Item_sum* item) + void next_row() { if (n_rows_to_skip) { @@ -1374,9 +1454,9 @@ public: return; // this is not expected to happen. if (is_top_bound) // this is frame start endpoint - item->remove(); + remove_value_from_items(); else - item->add(); + add_value_to_items(); } }; @@ -1391,17 +1471,18 @@ public: class Frame_rows_current_row_bottom : public Frame_cursor { public: - void pre_next_partition(ha_rows rownum, Item_sum* item) + + void pre_next_partition(ha_rows rownum) { - item->add(); + add_value_to_items(); } - void next_partition(ha_rows rownum, Item_sum* item) {} - void pre_next_row(Item_sum* item) + void next_partition(ha_rows rownum) {} + void pre_next_row() { /* Temp table's current row is current_row. Add it to the window func */ - item->add(); + add_value_to_items(); } - void next_row(Item_sum* item) {}; + void next_row() {}; }; @@ -1443,20 +1524,23 @@ class Frame_n_rows_following : public Frame_cursor Partition_read_cursor cursor; bool at_partition_end; public: - Frame_n_rows_following(bool is_top_bound_arg, ha_rows n_rows_arg) : - is_top_bound(is_top_bound_arg), n_rows(n_rows_arg) + Frame_n_rows_following(THD *thd, + SQL_I_List<ORDER> *partition_list, + SQL_I_List<ORDER> *order_list, + bool is_top_bound_arg, ha_rows n_rows_arg) : + is_top_bound(is_top_bound_arg), n_rows(n_rows_arg), + cursor(thd, partition_list) { DBUG_ASSERT(n_rows > 0); } - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list, - SQL_I_List<ORDER> *order_list) + void init(READ_RECORD *info) { - cursor.init(thd, info, partition_list); + cursor.init(info); at_partition_end= false; } - void pre_next_partition(ha_rows rownum, Item_sum* item) + void pre_next_partition(ha_rows rownum) { at_partition_end= false; @@ -1469,39 +1553,39 @@ public: // Current row points at the first row in the partition if (is_top_bound) // this is frame top endpoint - item->remove(); + remove_value_from_items(); else - item->add(); + add_value_to_items(); } } /* Move our cursor to be n_rows ahead. */ - void next_partition(ha_rows rownum, Item_sum* item) + void next_partition(ha_rows rownum) { ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound; for (ha_rows i= 0; i < i_end; i++) { - if (next_row_intern(item)) + if (next_row_intern()) break; } } - void next_row(Item_sum* item) + void next_row() { if (at_partition_end) return; - next_row_intern(item); + next_row_intern(); } private: - bool next_row_intern(Item_sum *item) + bool next_row_intern() { if (!cursor.get_next()) { if (is_top_bound) // this is frame start endpoint - item->remove(); + remove_value_from_items(); else - item->add(); + add_value_to_items(); } else at_partition_end= true; @@ -1513,8 +1597,9 @@ private: /* Get a Frame_cursor for a frame bound. This is a "factory function". */ -Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) +Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool is_top_bound) { + Window_frame *frame= spec->window_frame; if (!frame) { /* @@ -1536,9 +1621,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) so again the same frame bounds can be used. */ if (is_top_bound) - return new Frame_unbounded_preceding; + return new Frame_unbounded_preceding(thd, + spec->partition_list, + spec->order_list); else - return new Frame_range_current_row_bottom; + return new Frame_range_current_row_bottom(thd, + spec->partition_list, + spec->order_list); } Window_frame_bound *bound= is_top_bound? frame->top_bound : @@ -1554,9 +1643,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) { /* The following serve both RANGE and ROWS: */ if (is_preceding) - return new Frame_unbounded_preceding; - else - return new Frame_unbounded_following; + return new Frame_unbounded_preceding(thd, + spec->partition_list, + spec->order_list); + + return new Frame_unbounded_following(thd, + spec->partition_list, + spec->order_list); } if (frame->units == Window_frame::UNITS_ROWS) @@ -1567,15 +1660,21 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) DBUG_ASSERT((longlong) n_rows >= 0); if (is_preceding) return new Frame_n_rows_preceding(is_top_bound, n_rows); - else - return new Frame_n_rows_following(is_top_bound, n_rows); + + return new Frame_n_rows_following( + thd, spec->partition_list, spec->order_list, + is_top_bound, n_rows); } else { if (is_top_bound) - return new Frame_range_n_top(is_preceding, bound->offset); - else - return new Frame_range_n_bottom(is_preceding, bound->offset); + return new Frame_range_n_top( + thd, spec->partition_list, spec->order_list, + is_preceding, bound->offset); + + return new Frame_range_n_bottom(thd, + spec->partition_list, spec->order_list, + is_preceding, bound->offset); } } @@ -1585,67 +1684,154 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound) { if (is_top_bound) return new Frame_rows_current_row_top; - else - return new Frame_rows_current_row_bottom; + + return new Frame_rows_current_row_bottom; } else { if (is_top_bound) - return new Frame_range_current_row_top; - else - return new Frame_range_current_row_bottom; + return new Frame_range_current_row_top( + thd, spec->partition_list, spec->order_list); + + return new Frame_range_current_row_bottom( + thd, spec->partition_list, spec->order_list); } } return NULL; } -void add_extra_frame_cursors(List<Frame_cursor> *cursors, - const Item_sum *window_func) +void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager, + Item_window_func *window_func) { - switch (window_func->sum_func()) + Window_spec *spec= window_func->window_spec; + Item_sum *item_sum= window_func->window_func(); + Frame_cursor *fc; + switch (item_sum->sum_func()) { case Item_sum::CUME_DIST_FUNC: - cursors->push_back(new Frame_unbounded_preceding); - cursors->push_back(new Frame_range_current_row_bottom); + fc= new Frame_unbounded_preceding(thd, + spec->partition_list, + spec->order_list); + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); + fc= new Frame_range_current_row_bottom(thd, + spec->partition_list, + spec->order_list); + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); break; default: - cursors->push_back(new Frame_unbounded_preceding); - cursors->push_back(new Frame_rows_current_row_bottom); + fc= new Frame_unbounded_preceding( + thd, spec->partition_list, spec->order_list); + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); + + fc= new Frame_rows_current_row_bottom; + fc->add_sum_func(item_sum); + cursor_manager->add_cursor(fc); } } -void get_window_func_required_cursors( - List<Frame_cursor> *result, const Item_window_func* item_win) + +/* + Create required frame cursors for the list of window functions. + Register all functions to their appropriate cursors. + If the window functions share the same frame specification, + those window functions will be registered to the same cursor. +*/ +void get_window_functions_required_cursors( + THD *thd, + List<Item_window_func>& window_functions, + List<Cursor_manager> *cursor_managers) { - if (item_win->requires_partition_size()) - result->push_back(new Frame_unbounded_following_set_count); + List_iterator_fast<Item_window_func> it(window_functions); + Item_window_func* item_win_func; + Item_sum *sum_func; + while ((item_win_func= it++)) + { + Cursor_manager *cursor_manager = new Cursor_manager(); + sum_func = item_win_func->window_func(); + Frame_cursor *fc; + /* + Some window functions require the partition size for computing values. + Add a cursor that retrieves it as the first one in the list if necessary. + */ + if (item_win_func->requires_partition_size()) + { + fc= new Frame_unbounded_following_set_count(thd, + item_win_func->window_spec->partition_list, + item_win_func->window_spec->order_list); + fc->add_sum_func(sum_func); + cursor_manager->add_cursor(fc); + } - /* - If it is not a regular window function that follows frame specifications, - specific cursors are required. - */ - if (item_win->is_frame_prohibited()) + /* + If it is not a regular window function that follows frame specifications, + specific cursors are required. ROW_NUM, RANK, NTILE and others follow + such rules. Check is_frame_prohibited check for the full list. + */ + if (item_win_func->is_frame_prohibited()) + { + add_extra_frame_cursors(thd, cursor_manager, item_win_func); + cursor_managers->push_back(cursor_manager); + continue; + } + + Frame_cursor *frame_bottom= get_frame_cursor(thd, + item_win_func->window_spec, false); + Frame_cursor *frame_top= get_frame_cursor(thd, + item_win_func->window_spec, true); + + frame_bottom->add_sum_func(sum_func); + frame_top->add_sum_func(sum_func); + + /* + The order of these cursors is important. A sum function + must first add values (via frame_bottom) then remove them via + frame_top. Removing items first doesn't make sense in the case of all + window functions. + */ + cursor_manager->add_cursor(frame_bottom); + cursor_manager->add_cursor(frame_top); + cursor_managers->push_back(cursor_manager); + } +} + +/** + Helper function that takes a list of window functions and writes + their values in the current table record. +*/ +static +bool save_window_function_values(List<Item_window_func>& window_functions, + TABLE *tbl, uchar *rowid_buf) +{ + List_iterator_fast<Item_window_func> iter(window_functions); + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); + store_record(tbl, record[1]); + while (Item_window_func *item_win= iter++) { - add_extra_frame_cursors(result, item_win->window_func()); - return; + int err; + item_win->save_in_field(item_win->result_field, true); + // TODO check if this can be placed outside the loop. + err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); + if (err && err != HA_ERR_RECORD_IS_THE_SAME) + return true; } - /* A regular window function follows the frame specification. */ - result->push_back(get_frame_cursor(item_win->window_spec->window_frame, - false)); - result->push_back(get_frame_cursor(item_win->window_spec->window_frame, - true)); + return false; } /* + TODO(cvicentiu) update this comment to reflect the new execution. + Streamed window function computation with window frames. We make a single pass over the ordered temp.table, but we're using three - cursors: + cursors: - current row - the row that we're computing window func value for) - start_bound - the start of the frame - bottom_bound - the end of the frame - + All three cursors move together. @todo @@ -1655,7 +1841,7 @@ void get_window_func_required_cursors( @detail ROWS BETWEEN 3 PRECEDING -- frame start AND 3 FOLLOWING -- frame end - + /------ frame end (aka BOTTOM) Dataset start | --------====*=======[*]========*========-------->> dataset end @@ -1663,7 +1849,7 @@ void get_window_func_required_cursors( | +-------- current row | \-------- frame start ("TOP") - + - frame_end moves forward and adds rows into the aggregate function. - frame_start follows behind and removes rows from the aggregate function. - current_row is the row where the value of aggregate function is stored. @@ -1672,97 +1858,90 @@ void get_window_func_required_cursors( condition (Others can catch up by counting rows?) */ - -bool compute_window_func_with_frames(Item_window_func *item_win, - TABLE *tbl, READ_RECORD *info) +bool compute_window_func(THD *thd, + List<Item_window_func>& window_functions, + List<Cursor_manager>& cursor_managers, + TABLE *tbl, + SORT_INFO *filesort_result) { - THD *thd= tbl->in_use; - int err= 0; + List_iterator_fast<Item_window_func> iter_win_funcs(window_functions); + List_iterator_fast<Cursor_manager> iter_cursor_managers(cursor_managers); + uint err; - Item_sum *sum_func= item_win->window_func(); - /* This algorithm doesn't support DISTINCT aggregator */ - sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR); + READ_RECORD info; - List<Frame_cursor> cursors; - get_window_func_required_cursors(&cursors, item_win); + if (init_read_record(&info, current_thd, tbl, NULL/*select*/, filesort_result, + 0, 1, FALSE)) + return true; - List_iterator_fast<Frame_cursor> it(cursors); - Frame_cursor *c; - while((c= it++)) + Cursor_manager *cursor_manager; + while ((cursor_manager= iter_cursor_managers++)) + cursor_manager->initialize_cursors(&info); + + /* One partition tracker for each window function. */ + List<Group_bound_tracker> partition_trackers; + Item_window_func *win_func; + while ((win_func= iter_win_funcs++)) { - c->init(thd, info, item_win->window_spec->partition_list, - item_win->window_spec->order_list); + Group_bound_tracker *tracker= new Group_bound_tracker(thd, + win_func->window_spec->partition_list); + // TODO(cvicentiu) This should be removed and placed in constructor. + tracker->init(); + partition_trackers.push_back(tracker); } - bool is_error= false; + List_iterator_fast<Group_bound_tracker> iter_part_trackers(partition_trackers); ha_rows rownum= 0; uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0)); while (true) { - /* Move the current_row */ - if ((err=info->read_record(info))) - { - break; /* End of file */ - } - bool partition_changed= item_win->check_if_partition_changed(); + if ((err= info.read_record(&info))) + break; // End of file. + /* Remember current row so that we can restore it before computing + each window function. */ tbl->file->position(tbl->record[0]); memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length); - if (partition_changed || (rownum == 0)) - { - sum_func->clear(); - /* - pre_XXX functions assume that tbl->record[0] contains current_row, and - they may not change it. - */ - it.rewind(); - while ((c= it++)) - c->pre_next_partition(rownum, sum_func); - /* - We move bottom_bound first, because we want rows to be added into the - aggregate before top_bound attempts to remove them. - */ - it.rewind(); - while ((c= it++)) - c->next_partition(rownum, sum_func); - } - else - { - /* Again, both pre_XXX function can find current_row in tbl->record[0] */ - it.rewind(); - while ((c= it++)) - c->pre_next_row(sum_func); - - /* These make no assumptions about tbl->record[0] and may change it */ - it.rewind(); - while ((c= it++)) - c->next_row(sum_func); - } - rownum++; + iter_win_funcs.rewind(); + iter_part_trackers.rewind(); + iter_cursor_managers.rewind(); - /* - Frame cursors may have made tbl->record[0] to point to some record other - than current_row. This applies to tbl->file's internal state, too. - Fix this by reading the current row again. - */ - tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); - store_record(tbl,record[1]); - item_win->save_in_field(item_win->result_field, true); - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]); - if (err && err != HA_ERR_RECORD_IS_THE_SAME) + Group_bound_tracker *tracker; + while ((win_func= iter_win_funcs++) && + (tracker= iter_part_trackers++) && + (cursor_manager= iter_cursor_managers++)) { - is_error= true; - break; + if (tracker->check_if_next_group() || (rownum == 0)) + { + /* TODO(cvicentiu) + Clearing window functions should happen through cursors. */ + win_func->window_func()->clear(); + cursor_manager->notify_cursors_partition_changed(rownum); + } + else + { + cursor_manager->notify_cursors_next_row(); + } + /* Return to current row after notifying cursors for each window + function. */ + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf); } + + /* We now have computed values for each window function. They can now + be saved in the current row. */ + save_window_function_values(window_functions, tbl, rowid_buf); + + rownum++; } my_free(rowid_buf); - cursors.delete_elements(); - return is_error? true: false; -} + partition_trackers.delete_elements(); + end_read_record(&info); + return false; +} /* Make a list that is a concation of two lists of ORDER elements */ @@ -1799,25 +1978,18 @@ static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2) return res; } - -bool Window_func_runner::setup(THD *thd) +bool Window_func_runner::add_function_to_run(Item_window_func *win_func) { - win_func->setup_partition_border_check(thd); + + Item_sum *sum_func= win_func->window_func(); + sum_func->setup_window_func(current_thd, win_func->window_spec); Item_sum::Sumfunctype type= win_func->window_func()->sum_func(); - switch (type) + switch (type) { case Item_sum::ROW_NUMBER_FUNC: case Item_sum::RANK_FUNC: case Item_sum::DENSE_RANK_FUNC: - { - /* - One-pass window function computation, walk through the rows and - assign values. - */ - compute_func= compute_window_func_values; - break; - } case Item_sum::COUNT_FUNC: case Item_sum::SUM_BIT_FUNC: case Item_sum::SUM_FUNC: @@ -1825,43 +1997,49 @@ bool Window_func_runner::setup(THD *thd) case Item_sum::PERCENT_RANK_FUNC: case Item_sum::CUME_DIST_FUNC: case Item_sum::NTILE_FUNC: - { - /* - Frame-aware window function computation. It does one pass, but - uses three cursors -frame_start, current_row, and frame_end. - */ - compute_func= compute_window_func_with_frames; break; - } + default: - my_error(ER_NOT_SUPPORTED_YET, MYF(0), "This aggregate as window function"); + my_error(ER_NOT_SUPPORTED_YET, MYF(0), + "This aggregate as window function"); return true; } - return false; + return window_functions.push_back(win_func); } /* Compute the value of window function for all rows. */ -bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result) +bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result) { - THD *thd= current_thd; - win_func->set_phase_to_computation(); - - /* Go through the sorted array and compute the window function */ - READ_RECORD info; - - if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result, - 0, 1, FALSE)) - return true; + List_iterator_fast<Item_window_func> it(window_functions); + Item_window_func *win_func; + while ((win_func= it++)) + { + win_func->set_phase_to_computation(); + // TODO(cvicentiu) Setting the aggregator should probably be done during + // setup of Window_funcs_sort. + win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR); + } + it.rewind(); - bool is_error= compute_func(win_func, tbl, &info); + List<Cursor_manager> cursor_managers; + get_window_functions_required_cursors(thd, window_functions, + &cursor_managers); - win_func->set_phase_to_retrieval(); + /* Go through the sorted array and compute the window function */ + bool is_error= compute_window_func(thd, + window_functions, + cursor_managers, + tbl, filesort_result); + while ((win_func= it++)) + { + win_func->set_phase_to_retrieval(); + } - end_read_record(&info); + cursor_managers.delete_elements(); return is_error; } @@ -1872,21 +2050,15 @@ bool Window_funcs_sort::exec(JOIN *join) THD *thd= join->thd; JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count]; + /* Sort the table based on the most specific sorting criteria of + the window functions. */ if (create_sort_index(thd, join, join_tab, filesort)) return true; TABLE *tbl= join_tab->table; SORT_INFO *filesort_result= join_tab->filesort_result; - bool is_error= false; - List_iterator<Window_func_runner> it(runners); - Window_func_runner *runner; - - while ((runner= it++)) - { - if ((is_error= runner->exec(tbl, filesort_result))) - break; - } + bool is_error= runner.exec(thd, tbl, filesort_result); delete join_tab->filesort_result; join_tab->filesort_result= NULL; @@ -1894,30 +2066,32 @@ bool Window_funcs_sort::exec(JOIN *join) } -bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, - List_iterator<Item_window_func> &it) +bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, + List_iterator<Item_window_func> &it) { Item_window_func *win_func= it.peek(); Item_window_func *prev_win_func; + /* The iterator should point to a valid function at the start of execution. */ + DBUG_ASSERT(win_func); do { - Window_func_runner *runner; - if (!(runner= new Window_func_runner(win_func)) || - runner->setup(thd)) - { + if (runner.add_function_to_run(win_func)) return true; - } - runners.push_back(runner); it++; prev_win_func= win_func; - } while ((win_func= it.peek()) && !(win_func->marker & SORTORDER_CHANGE_FLAG)); - + } while ((win_func= it.peek()) && + !(win_func->marker & SORTORDER_CHANGE_FLAG)); + /* The sort criteria must be taken from the last win_func in the group of - adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. + adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is + because the sort order must be the most specific sorting criteria defined + within the window function group. This ensures that we sort the table + in a way that the result is valid for all window functions belonging to + this Window_funcs_sort. */ - Window_spec *spec = prev_win_func->window_spec; + Window_spec *spec= prev_win_func->window_spec; ORDER* sort_order= concat_order_lists(thd->mem_root, spec->partition_list->first, @@ -1932,8 +2106,8 @@ bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, bool Window_funcs_computation::setup(THD *thd, - List<Item_window_func> *window_funcs, - JOIN_TAB *tab) + List<Item_window_func> *window_funcs, + JOIN_TAB *tab) { order_window_funcs_by_window_specs(window_funcs); diff --git a/sql/sql_window.h b/sql/sql_window.h index 54e39d827fe..c3847240e9a 100644 --- a/sql/sql_window.h +++ b/sql/sql_window.h @@ -154,9 +154,7 @@ int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, // Classes that make window functions computation a part of SELECT's query plan ////////////////////////////////////////////////////////////////////////////// -typedef bool (*window_compute_func_t)(Item_window_func *item_win, - TABLE *tbl, READ_RECORD *info); - +class Frame_cursor; /* This handles computation of one window function. @@ -165,21 +163,17 @@ typedef bool (*window_compute_func_t)(Item_window_func *item_win, class Window_func_runner : public Sql_alloc { - Item_window_func *win_func; - - /* The function to use for computation*/ - window_compute_func_t compute_func; - public: - Window_func_runner(Item_window_func *win_func_arg) : - win_func(win_func_arg) - {} + /* Add the function to be computed during the execution pass */ + bool add_function_to_run(Item_window_func *win_func); - // Set things up. Create filesort structures, etc - bool setup(THD *thd); - - // This sorts and runs the window function. - bool exec(TABLE *tbl, SORT_INFO *filesort_result); + /* Compute and fill the fields in the table. */ + bool exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result); + +private: + /* A list of window functions for which this Window_func_runner will compute + values during the execution phase. */ + List<Item_window_func> window_functions; }; @@ -191,21 +185,24 @@ public: class Window_funcs_sort : public Sql_alloc { - List<Window_func_runner> runners; - - /* Window functions can be computed over this sorting */ - Filesort *filesort; public: bool setup(THD *thd, SQL_SELECT *sel, List_iterator<Item_window_func> &it); bool exec(JOIN *join); void cleanup() { delete filesort; } friend class Window_funcs_computation; + +private: + Window_func_runner runner; + + /* Window functions can be computed over this sorting */ + Filesort *filesort; }; struct st_join_table; class Explain_aggr_window_funcs; + /* This is a "window function computation phase": a single object of this class takes care of computing all window functions in a SELECT. |