summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/item_windowfunc.cc43
-rw-r--r--sql/item_windowfunc.h90
-rw-r--r--sql/sql_window.cc884
-rw-r--r--sql/sql_window.h37
4 files changed, 608 insertions, 446 deletions
diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc
index d157d545dad..c8ea979900c 100644
--- a/sql/item_windowfunc.cc
+++ b/sql/item_windowfunc.cc
@@ -41,7 +41,7 @@ Item_window_func::resolve_window_name(THD *thd)
return true;
}
- return false;
+ return false;
}
@@ -154,7 +154,7 @@ void Item_window_func::split_sum_func(THD *thd, Ref_ptr_array ref_pointer_array,
/*
- This must be called before advance_window() can be called.
+ This must be called before attempting to compute the window function values.
@detail
If we attempt to do it in fix_fields(), partition_fields will refer
@@ -162,30 +162,25 @@ void Item_window_func::split_sum_func(THD *thd, Ref_ptr_array ref_pointer_array,
We need it to refer to temp.table columns.
*/
-void Item_window_func::setup_partition_border_check(THD *thd)
-{
- partition_tracker.init(thd, window_spec->partition_list);
- window_func()->setup_window_func(thd, window_spec);
-}
-
-
void Item_sum_rank::setup_window_func(THD *thd, Window_spec *window_spec)
{
/* TODO: move this into Item_window_func? */
- peer_tracker.init(thd, window_spec->order_list);
+ peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
+ peer_tracker->init();
clear();
}
void Item_sum_dense_rank::setup_window_func(THD *thd, Window_spec *window_spec)
{
/* TODO: consider moving this && Item_sum_rank's implementation */
- peer_tracker.init(thd, window_spec->order_list);
+ peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
+ peer_tracker->init();
clear();
}
bool Item_sum_dense_rank::add()
{
- if (peer_tracker.check_if_next_group() || first_add)
+ if (peer_tracker->check_if_next_group() || first_add)
{
first_add= false;
dense_rank++;
@@ -198,7 +193,7 @@ bool Item_sum_dense_rank::add()
bool Item_sum_rank::add()
{
row_number++;
- if (peer_tracker.check_if_next_group())
+ if (peer_tracker->check_if_next_group())
{
/* Row value changed */
cur_rank= row_number;
@@ -206,25 +201,10 @@ bool Item_sum_rank::add()
return false;
}
-bool Item_window_func::check_if_partition_changed()
-{
- return partition_tracker.check_if_next_group();
-}
-
-void Item_window_func::advance_window()
-{
- if (check_if_partition_changed())
- {
- /* Next partition */
- window_func()->clear();
- }
- window_func()->add();
-}
-
bool Item_sum_percent_rank::add()
{
row_number++;
- if (peer_tracker.check_if_next_group())
+ if (peer_tracker->check_if_next_group())
{
/* Row value changed. */
cur_rank= row_number;
@@ -235,8 +215,7 @@ bool Item_sum_percent_rank::add()
void Item_sum_percent_rank::setup_window_func(THD *thd, Window_spec *window_spec)
{
/* TODO: move this into Item_window_func? */
- peer_tracker.init(thd, window_spec->order_list);
+ peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
+ peer_tracker->init();
clear();
}
-
-
diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h
index 163cc855fe2..bb2256207ec 100644
--- a/sql/item_windowfunc.h
+++ b/sql/item_windowfunc.h
@@ -12,25 +12,19 @@ int test_if_group_changed(List<Cached_item> &list);
/* A wrapper around test_if_group_changed */
class Group_bound_tracker
{
- List<Cached_item> group_fields;
- /*
- During the first check_if_next_group, the list of cached_items is not
- initialized. The compare function will return that the items match if
- the field's value is the same as the Cached_item's default value (0).
- This flag makes sure that we always return true during the first check.
-
- XXX This is better to be implemented within test_if_group_changed, but
- since it is used in other parts of the codebase, we keep it here for now.
- */
- bool first_check;
public:
- void init(THD *thd, SQL_I_List<ORDER> *list)
+
+ Group_bound_tracker(THD *thd, SQL_I_List<ORDER> *list)
{
for (ORDER *curr = list->first; curr; curr=curr->next)
{
Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
group_fields.push_back(tmp);
}
+ }
+
+ void init()
+ {
first_check= true;
}
@@ -76,6 +70,19 @@ public:
}
return 0;
}
+
+private:
+ List<Cached_item> group_fields;
+ /*
+ During the first check_if_next_group, the list of cached_items is not
+ initialized. The compare function will return that the items match if
+ the field's value is the same as the Cached_item's default value (0).
+ This flag makes sure that we always return true during the first check.
+
+ XXX This is better to be implemented within test_if_group_changed, but
+ since it is used in other parts of the codebase, we keep it here for now.
+ */
+ bool first_check;
};
/*
@@ -92,19 +99,22 @@ class Item_sum_row_number: public Item_sum_int
longlong count;
public:
+
+ Item_sum_row_number(THD *thd)
+ : Item_sum_int(thd), count(0) {}
+
void clear()
{
count= 0;
}
- bool add()
+
+ bool add()
{
count++;
- return false;
+ return false;
}
- void update_field() {}
- Item_sum_row_number(THD *thd)
- : Item_sum_int(thd), count(0) {}
+ void update_field() {}
enum Sumfunctype sum_func() const
{
@@ -119,6 +129,7 @@ public:
{
return "row_number(";
}
+
Item *get_copy(THD *thd, MEM_ROOT *mem_root)
{ return get_item_copy<Item_sum_row_number>(thd, mem_root, this); }
};
@@ -146,9 +157,12 @@ class Item_sum_rank: public Item_sum_int
protected:
longlong row_number; // just ROW_NUMBER()
longlong cur_rank; // current value
-
- Group_bound_tracker peer_tracker;
+
+ Group_bound_tracker *peer_tracker;
public:
+
+ Item_sum_rank(THD *thd) : Item_sum_int(thd), peer_tracker(NULL) {}
+
void clear()
{
/* This is called on partition start */
@@ -169,10 +183,6 @@ public:
TODO: ^^ what does this do ? It is not called ever?
*/
-public:
- Item_sum_rank(THD *thd)
- : Item_sum_int(thd) {}
-
enum Sumfunctype sum_func () const
{
return RANK_FUNC;
@@ -184,9 +194,12 @@ public:
}
void setup_window_func(THD *thd, Window_spec *window_spec);
+
void cleanup()
{
- peer_tracker.cleanup();
+ if (peer_tracker)
+ peer_tracker->cleanup();
+ delete peer_tracker;
Item_sum_int::cleanup();
}
Item *get_copy(THD *thd, MEM_ROOT *mem_root)
@@ -217,7 +230,7 @@ class Item_sum_dense_rank: public Item_sum_int
{
longlong dense_rank;
bool first_add;
- Group_bound_tracker peer_tracker;
+ Group_bound_tracker *peer_tracker;
public:
/*
XXX(cvicentiu) This class could potentially be implemented in the rank
@@ -236,7 +249,7 @@ class Item_sum_dense_rank: public Item_sum_int
}
Item_sum_dense_rank(THD *thd)
- : Item_sum_int(thd), dense_rank(0), first_add(true) {}
+ : Item_sum_int(thd), dense_rank(0), first_add(true), peer_tracker(NULL) {}
enum Sumfunctype sum_func () const
{
return DENSE_RANK_FUNC;
@@ -251,7 +264,11 @@ class Item_sum_dense_rank: public Item_sum_int
void cleanup()
{
- peer_tracker.cleanup();
+ if (peer_tracker)
+ {
+ peer_tracker->cleanup();
+ delete peer_tracker;
+ }
Item_sum_int::cleanup();
}
Item *get_copy(THD *thd, MEM_ROOT *mem_root)
@@ -294,7 +311,7 @@ class Item_sum_percent_rank: public Item_sum_window_with_row_count
{
public:
Item_sum_percent_rank(THD *thd)
- : Item_sum_window_with_row_count(thd), cur_rank(1) {}
+ : Item_sum_window_with_row_count(thd), cur_rank(1), peer_tracker(NULL) {}
longlong val_int()
{
@@ -354,11 +371,15 @@ class Item_sum_percent_rank: public Item_sum_window_with_row_count
longlong cur_rank; // Current rank of the current row.
longlong row_number; // Value if this were ROW_NUMBER() function.
- Group_bound_tracker peer_tracker;
+ Group_bound_tracker *peer_tracker;
void cleanup()
{
- peer_tracker.cleanup();
+ if (peer_tracker)
+ {
+ peer_tracker->cleanup();
+ delete peer_tracker;
+ }
Item_sum_num::cleanup();
}
};
@@ -515,12 +536,6 @@ public:
public:
Window_spec *window_spec;
- /*
- This stores the data about the partition we're currently in.
- advance_window() uses this to tell when we've left one partition and
- entered another
- */
- Group_bound_tracker partition_tracker;
public:
Item_window_func(THD *thd, Item_sum *win_func, LEX_STRING *win_name)
: Item_func_or_sum(thd, (Item *) win_func),
@@ -613,9 +628,6 @@ public:
*/
void setup_partition_border_check(THD *thd);
- void advance_window();
- bool check_if_partition_changed();
-
enum_field_types field_type() const
{
return ((Item_sum *) args[0])->field_type();
diff --git a/sql/sql_window.cc b/sql/sql_window.cc
index 4c5ef53d854..a862821bd56 100644
--- a/sql/sql_window.cc
+++ b/sql/sql_window.cc
@@ -37,12 +37,12 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
if (win_spec->order_list->elements && order_list->elements)
{
my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0), ref_name);
- return true;
+ return true;
}
- if (win_spec->window_frame)
+ if (win_spec->window_frame)
{
my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0), ref_name);
- return true;
+ return true;
}
referenced_win_spec= win_spec;
if (partition_list->elements == 0)
@@ -54,7 +54,7 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
if (ref_name && !referenced_win_spec)
{
my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name);
- return true;
+ return true;
}
window_names_are_checked= true;
return false;
@@ -73,7 +73,7 @@ Window_frame::check_frame_bounds()
top_bound->precedence_type == Window_frame_bound::FOLLOWING))
{
my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0));
- return true;
+ return true;
}
return false;
@@ -86,7 +86,7 @@ Window_frame::check_frame_bounds()
int
setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
- List<Item> &fields, List<Item> &all_fields,
+ List<Item> &fields, List<Item> &all_fields,
List<Window_spec> &win_specs, List<Item_window_func> &win_funcs)
{
Window_spec *win_spec;
@@ -116,7 +116,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
it.rewind();
List_iterator_fast<Window_spec> itp(win_specs);
-
+
while ((win_spec= it++))
{
bool hidden_group_fields;
@@ -131,7 +131,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
{
DBUG_RETURN(1);
}
-
+
if (win_spec->window_frame &&
win_spec->window_frame->exclusion != Window_frame::EXCL_NONE)
{
@@ -188,7 +188,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
}
}
}
-
+
/* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */
if (win_spec->window_frame &&
win_spec->window_frame->units == Window_frame::UNITS_ROWS)
@@ -219,7 +219,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
{
win_func_item->update_used_tables();
}
-
+
DBUG_RETURN(0);
}
@@ -445,7 +445,7 @@ typedef int (*Item_window_func_cmp)(Item_window_func *f1,
@brief
Sort window functions so that those that can be computed together are
adjacent.
-
+
@detail
Sort window functions by their
- required sorting order,
@@ -498,48 +498,15 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
}
else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
curr->marker|= FRAME_CHANGE_FLAG;
-
- prev= curr;
- }
+
+ prev= curr;
+ }
}
/////////////////////////////////////////////////////////////////////////////
-/*
- Do a pass over sorted table and compute window function values.
-
- This function is for handling window functions that can be computed on the
- fly. Examples are RANK() and ROW_NUMBER().
-*/
-bool compute_window_func_values(Item_window_func *item_win,
- TABLE *tbl, READ_RECORD *info)
-{
- int err;
- while (!(err=info->read_record(info)))
- {
- store_record(tbl,record[1]);
-
- /*
- This will cause window function to compute its value for the
- current row :
- */
- item_win->advance_window();
-
- /*
- Put the new value into temptable's field
- TODO: Should this use item_win->update_field() call?
- Regular aggegate function implementations seem to implement it.
- */
- item_win->save_in_field(item_win->result_field, true);
- err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
- if (err && err != HA_ERR_RECORD_IS_THE_SAME)
- return true;
- }
- return false;
-}
-
/////////////////////////////////////////////////////////////////////////////
// Window Frames support
/////////////////////////////////////////////////////////////////////////////
@@ -571,11 +538,6 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
class Rowid_seq_cursor
{
- uchar *cache_start;
- uchar *cache_pos;
- uchar *cache_end;
- uint ref_length;
-
public:
virtual ~Rowid_seq_cursor() {}
@@ -595,17 +557,17 @@ public:
cache_pos+= ref_length;
return 0;
}
-
+
ha_rows get_rownum()
{
return (cache_pos - cache_start) / ref_length;
}
- // will be called by ROWS n FOLLOWING to catch up.
void move_to(ha_rows row_number)
{
cache_pos= cache_start + row_number * ref_length;
}
+
protected:
bool at_eof() { return (cache_pos == cache_end); }
@@ -618,6 +580,12 @@ protected:
}
uchar *get_curr_rowid() { return cache_pos; }
+
+private:
+ uchar *cache_start;
+ uchar *cache_pos;
+ uchar *cache_end;
+ uint ref_length;
};
@@ -627,11 +595,6 @@ protected:
class Table_read_cursor : public Rowid_seq_cursor
{
- /*
- Note: we don't own *read_record, somebody else is using it.
- We only look at the constant part of it, e.g. table, record buffer, etc.
- */
- READ_RECORD *read_record;
public:
virtual ~Table_read_cursor() {}
@@ -668,7 +631,14 @@ public:
return false; // didn't restore
}
- // todo: should move_to() also read row here?
+private:
+ /*
+ Note: we don't own *read_record, somebody else is using it.
+ We only look at the constant part of it, e.g. table, record buffer, etc.
+ */
+ READ_RECORD *read_record;
+
+ // TODO(spetrunia): should move_to() also read row here?
};
@@ -679,14 +649,14 @@ public:
class Partition_read_cursor
{
- Table_read_cursor tbl_cursor;
- Group_bound_tracker bound_tracker;
- bool end_of_partition;
public:
- void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list)
+ Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) :
+ bound_tracker(thd, partition_list) {}
+
+ void init(READ_RECORD *info)
{
tbl_cursor.init(info);
- bound_tracker.init(thd, partition_list);
+ bound_tracker.init();
end_of_partition= false;
}
@@ -704,7 +674,7 @@ public:
}
/*
- Moves to a new row. The row is assumed to be within the current partition
+ Moves to a new row. The row is assumed to be within the current partition.
*/
void move_to(ha_rows rownum) { tbl_cursor.move_to(rownum); }
@@ -731,14 +701,18 @@ public:
{
return tbl_cursor.restore_last_row();
}
+
+private:
+ Table_read_cursor tbl_cursor;
+ Group_bound_tracker bound_tracker;
+ bool end_of_partition;
};
/////////////////////////////////////////////////////////////////////////////
-
/*
Window frame bound cursor. Abstract interface.
-
+
@detail
The cursor moves within the partition that the current row is in.
It may be ahead or behind the current row.
@@ -775,11 +749,12 @@ public:
class Frame_cursor : public Sql_alloc
{
public:
- virtual void init(THD *thd, READ_RECORD *info,
- SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
- {}
+ virtual void init(READ_RECORD *info) {};
+ bool add_sum_func(Item_sum* item)
+ {
+ return sum_functions.push_back(item);
+ }
/*
Current row has moved to the next partition and is positioned on the first
row there. Position the frame bound accordingly.
@@ -796,20 +771,95 @@ public:
- The callee may move tbl->file and tbl->record[0] to point to some other
row.
*/
- virtual void pre_next_partition(ha_rows rownum, Item_sum* item){};
- virtual void next_partition(ha_rows rownum, Item_sum* item)=0;
-
+ virtual void pre_next_partition(ha_rows rownum) {};
+ virtual void next_partition(ha_rows rownum)=0;
+
/*
The current row has moved one row forward.
Move this frame bound accordingly, and update the value of aggregate
function as necessary.
*/
- virtual void pre_next_row(Item_sum* item){};
- virtual void next_row(Item_sum* item)=0;
-
- virtual ~Frame_cursor(){}
+ virtual void pre_next_row() {};
+ virtual void next_row()=0;
+
+ virtual ~Frame_cursor() {}
+
+protected:
+ inline void add_value_to_items()
+ {
+ List_iterator_fast<Item_sum> it(sum_functions);
+ Item_sum *item_sum;
+ while ((item_sum= it++))
+ {
+ item_sum->add();
+ }
+ }
+ inline void remove_value_from_items()
+ {
+ List_iterator_fast<Item_sum> it(sum_functions);
+ Item_sum *item_sum;
+ while ((item_sum= it++))
+ {
+ item_sum->remove();
+ }
+ }
+
+ /* Sum functions that this cursor handles. */
+ List<Item_sum> sum_functions;
+};
+
+/*
+ A class that owns cursor objects associated with a specific window function.
+*/
+class Cursor_manager
+{
+public:
+ bool add_cursor(Frame_cursor *cursor)
+ {
+ return cursors.push_back(cursor);
+ }
+
+ void initialize_cursors(READ_RECORD *info)
+ {
+ List_iterator_fast<Frame_cursor> iter(cursors);
+ Frame_cursor *fc;
+ while ((fc= iter++))
+ fc->init(info);
+ }
+
+ void notify_cursors_partition_changed(ha_rows rownum)
+ {
+ List_iterator_fast<Frame_cursor> iter(cursors);
+ Frame_cursor *cursor;
+ while ((cursor= iter++))
+ cursor->pre_next_partition(rownum);
+
+ iter.rewind();
+ while ((cursor= iter++))
+ cursor->next_partition(rownum);
+ }
+
+ void notify_cursors_next_row()
+ {
+ List_iterator_fast<Frame_cursor> iter(cursors);
+ Frame_cursor *cursor;
+ while ((cursor= iter++))
+ cursor->pre_next_row();
+
+ iter.rewind();
+ while ((cursor= iter++))
+ cursor->next_row();
+ }
+
+ ~Cursor_manager() { cursors.delete_elements(); }
+
+private:
+ /* List of the cursors that this manager owns. */
+ List<Frame_cursor> cursors;
};
+
+
//////////////////////////////////////////////////////////////////////////////
// RANGE-type frames
//////////////////////////////////////////////////////////////////////////////
@@ -841,16 +891,12 @@ class Frame_range_n_top : public Frame_cursor
*/
int order_direction;
public:
- Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) :
+ Frame_range_n_top(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list,
+ bool is_preceding_arg, Item *n_val_arg) :
n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
- {}
-
- void init(THD *thd, READ_RECORD *info,
- SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
{
- cursor.init(info);
-
DBUG_ASSERT(order_list->elements == 1);
Item *src_expr= order_list->first->item[0];
if (order_list->first->direction == ORDER::ORDER_ASC)
@@ -872,24 +918,30 @@ public:
item_add->fix_fields(thd, &item_add);
}
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+ void init(READ_RECORD *info)
+ {
+ cursor.init(info);
+
+ }
+
+ void pre_next_partition(ha_rows rownum)
{
// Save the value of FUNC(current_row)
range_expr->fetch_value_from(item_add);
}
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
cursor.move_to(rownum);
- walk_till_non_peer(item);
+ walk_till_non_peer();
}
- void pre_next_row(Item_sum* item)
+ void pre_next_row()
{
range_expr->fetch_value_from(item_add);
}
- void next_row(Item_sum* item)
+ void next_row()
{
/*
Ok, our cursor is at the first row R where
@@ -900,19 +952,19 @@ public:
{
if (order_direction * range_expr->cmp_read_only() <= 0)
return;
- item->remove();
+ remove_value_from_items();
}
- walk_till_non_peer(item);
+ walk_till_non_peer();
}
private:
- void walk_till_non_peer(Item_sum* item)
+ void walk_till_non_peer()
{
while (!cursor.get_next())
{
if (order_direction * range_expr->cmp_read_only() <= 0)
break;
- item->remove();
+ remove_value_from_items();
}
}
};
@@ -950,16 +1002,13 @@ class Frame_range_n_bottom: public Frame_cursor
*/
int order_direction;
public:
- Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) :
- n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
- {}
-
- void init(THD *thd, READ_RECORD *info,
- SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
+ Frame_range_n_bottom(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list,
+ bool is_preceding_arg, Item *n_val_arg) :
+ cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
+ is_preceding(is_preceding_arg)
{
- cursor.init(thd, info, partition_list);
-
DBUG_ASSERT(order_list->elements == 1);
Item *src_expr= order_list->first->item[0];
@@ -982,7 +1031,12 @@ public:
item_add->fix_fields(thd, &item_add);
}
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+ void init(READ_RECORD *info)
+ {
+ cursor.init(info);
+ }
+
+ void pre_next_partition(ha_rows rownum)
{
// Save the value of FUNC(current_row)
range_expr->fetch_value_from(item_add);
@@ -991,20 +1045,20 @@ public:
end_of_partition= false;
}
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
cursor.move_to(rownum);
- walk_till_non_peer(item);
+ walk_till_non_peer();
}
- void pre_next_row(Item_sum* item)
+ void pre_next_row()
{
if (end_of_partition)
return;
range_expr->fetch_value_from(item_add);
}
- void next_row(Item_sum* item)
+ void next_row()
{
if (end_of_partition)
return;
@@ -1017,20 +1071,20 @@ public:
{
if (order_direction * range_expr->cmp_read_only() < 0)
return;
- item->add();
+ add_value_to_items();
}
- walk_till_non_peer(item);
+ walk_till_non_peer();
}
private:
- void walk_till_non_peer(Item_sum* item)
+ void walk_till_non_peer()
{
int res;
while (!(res= cursor.get_next()))
{
if (order_direction * range_expr->cmp_read_only() < 0)
break;
- item->add();
+ add_value_to_items();
}
if (res)
end_of_partition= true;
@@ -1043,11 +1097,11 @@ private:
...
| peer1
| peer2 <----- current_row
- | peer3
+ | peer3
+-peer4 <----- the cursor points here. peer4 itself is included.
nonpeer1
nonpeer2
-
+
This bound moves in front of the current_row. It should be a the first row
that is still a peer of the current row.
*/
@@ -1060,15 +1114,20 @@ class Frame_range_current_row_bottom: public Frame_cursor
bool dont_move;
public:
- void init(THD *thd, READ_RECORD *info,
- SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
+ Frame_range_current_row_bottom(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list) :
+ cursor(thd, partition_list), peer_tracker(thd, order_list)
+ {
+ }
+
+ void init(READ_RECORD *info)
{
- cursor.init(thd, info, partition_list);
- peer_tracker.init(thd, order_list);
+ cursor.init(info);
+ peer_tracker.init();
}
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+ void pre_next_partition(ha_rows rownum)
{
// Save the value of the current_row
peer_tracker.check_if_next_group();
@@ -1076,23 +1135,23 @@ public:
if (rownum != 0)
{
// Add the current row now because our cursor has already seen it
- item->add();
+ add_value_to_items();
}
}
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
- walk_till_non_peer(item);
+ walk_till_non_peer();
}
- void pre_next_row(Item_sum* item)
+ void pre_next_row()
{
dont_move= !peer_tracker.check_if_next_group();
if (!dont_move)
- item->add();
+ add_value_to_items();
}
- void next_row(Item_sum* item)
+ void next_row()
{
// Check if our cursor is pointing at a peer of the current row.
// If not, move forward until that becomes true
@@ -1104,11 +1163,11 @@ public:
*/
return;
}
- walk_till_non_peer(item);
+ walk_till_non_peer();
}
private:
- void walk_till_non_peer(Item_sum* item)
+ void walk_till_non_peer()
{
/*
Walk forward until we've met first row that's not a peer of the current
@@ -1118,7 +1177,7 @@ private:
{
if (peer_tracker.compare_with_cache())
break;
- item->add();
+ add_value_to_items();
}
}
};
@@ -1148,33 +1207,38 @@ class Frame_range_current_row_top : public Frame_cursor
bool move;
public:
- void init(THD *thd, READ_RECORD *info,
- SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
+ Frame_range_current_row_top(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list) :
+ bound_tracker(thd, partition_list), cursor(), peer_tracker(thd, order_list),
+ move(false)
+ {}
+
+ void init(READ_RECORD *info)
{
- bound_tracker.init(thd, partition_list);
+ bound_tracker.init();
cursor.init(info);
- peer_tracker.init(thd, order_list);
+ peer_tracker.init();
}
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+ void pre_next_partition(ha_rows rownum)
{
// Fetch the value from the first row
peer_tracker.check_if_next_group();
cursor.move_to(rownum+1);
}
- void next_partition(ha_rows rownum, Item_sum* item) {}
+ void next_partition(ha_rows rownum) {}
- void pre_next_row(Item_sum* item)
+ void pre_next_row()
{
// Check if the new current_row is a peer of the row that our cursor is
// pointing to.
move= peer_tracker.check_if_next_group();
}
- void next_row(Item_sum* item)
+ void next_row()
{
if (move)
{
@@ -1187,7 +1251,7 @@ public:
// todo: need the following check ?
if (!peer_tracker.compare_with_cache())
return;
- item->remove();
+ remove_value_from_items();
}
do
@@ -1196,7 +1260,7 @@ public:
return;
if (!peer_tracker.compare_with_cache())
return;
- item->remove();
+ remove_value_from_items();
}
while (1);
}
@@ -1214,7 +1278,14 @@ public:
class Frame_unbounded_preceding : public Frame_cursor
{
public:
- void next_partition(ha_rows rownum, Item_sum* item)
+ Frame_unbounded_preceding(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list)
+ {}
+
+ void init(READ_RECORD *info) {}
+
+ void next_partition(ha_rows rownum)
{
/*
UNBOUNDED PRECEDING frame end just stays on the first row.
@@ -1222,7 +1293,7 @@ public:
*/
}
- void next_row(Item_sum* item)
+ void next_row()
{
/* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
}
@@ -1239,18 +1310,22 @@ protected:
Partition_read_cursor cursor;
public:
- void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
+ Frame_unbounded_following(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list) :
+ cursor(thd, partition_list) {}
+
+ void init(READ_RECORD *info)
{
- cursor.init(thd, info, partition_list);
+ cursor.init(info);
}
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+ void pre_next_partition(ha_rows rownum)
{
cursor.on_next_partition(rownum);
}
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
if (!rownum)
{
@@ -1258,16 +1333,16 @@ public:
if (cursor.get_next())
return;
}
- item->add();
+ add_value_to_items();
/* Walk to the end of the partition, updating the SUM function */
while (!cursor.get_next())
{
- item->add();
+ add_value_to_items();
}
}
- void next_row(Item_sum* item)
+ void next_row()
{
/* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
}
@@ -1277,9 +1352,12 @@ public:
class Frame_unbounded_following_set_count : public Frame_unbounded_following
{
public:
- // pre_next_partition is inherited
+ Frame_unbounded_following_set_count(
+ THD *thd,
+ SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
+ Frame_unbounded_following(thd, partition_list, order_list) {}
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
ha_rows num_rows_in_partition= 0;
if (!rownum)
@@ -1292,13 +1370,16 @@ public:
/* Walk to the end of the partition, find how many rows there are. */
while (!cursor.get_next())
- {
num_rows_in_partition++;
- }
- Item_sum_window_with_row_count* item_with_row_count =
- static_cast<Item_sum_window_with_row_count *>(item);
- item_with_row_count->set_row_count(num_rows_in_partition);
+ List_iterator_fast<Item_sum> it(sum_functions);
+ Item_sum* item;
+ while ((item= it++))
+ {
+ Item_sum_window_with_row_count* item_with_row_count =
+ static_cast<Item_sum_window_with_row_count *>(item);
+ item_with_row_count->set_row_count(num_rows_in_partition);
+ }
}
};
@@ -1324,13 +1405,12 @@ public:
is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
{}
- void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
+ void init(READ_RECORD *info)
{
cursor.init(info);
}
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
/*
Position our cursor to point at the first row in the new partition
@@ -1357,12 +1437,12 @@ public:
if (n_rows_to_skip == ha_rows(-1))
{
cursor.get_next();
- item->add();
+ add_value_to_items();
n_rows_to_skip= 0;
}
}
- void next_row(Item_sum* item)
+ void next_row()
{
if (n_rows_to_skip)
{
@@ -1374,9 +1454,9 @@ public:
return; // this is not expected to happen.
if (is_top_bound) // this is frame start endpoint
- item->remove();
+ remove_value_from_items();
else
- item->add();
+ add_value_to_items();
}
};
@@ -1391,17 +1471,18 @@ public:
class Frame_rows_current_row_bottom : public Frame_cursor
{
public:
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+
+ void pre_next_partition(ha_rows rownum)
{
- item->add();
+ add_value_to_items();
}
- void next_partition(ha_rows rownum, Item_sum* item) {}
- void pre_next_row(Item_sum* item)
+ void next_partition(ha_rows rownum) {}
+ void pre_next_row()
{
/* Temp table's current row is current_row. Add it to the window func */
- item->add();
+ add_value_to_items();
}
- void next_row(Item_sum* item) {};
+ void next_row() {};
};
@@ -1443,20 +1524,23 @@ class Frame_n_rows_following : public Frame_cursor
Partition_read_cursor cursor;
bool at_partition_end;
public:
- Frame_n_rows_following(bool is_top_bound_arg, ha_rows n_rows_arg) :
- is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
+ Frame_n_rows_following(THD *thd,
+ SQL_I_List<ORDER> *partition_list,
+ SQL_I_List<ORDER> *order_list,
+ bool is_top_bound_arg, ha_rows n_rows_arg) :
+ is_top_bound(is_top_bound_arg), n_rows(n_rows_arg),
+ cursor(thd, partition_list)
{
DBUG_ASSERT(n_rows > 0);
}
- void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
- SQL_I_List<ORDER> *order_list)
+ void init(READ_RECORD *info)
{
- cursor.init(thd, info, partition_list);
+ cursor.init(info);
at_partition_end= false;
}
- void pre_next_partition(ha_rows rownum, Item_sum* item)
+ void pre_next_partition(ha_rows rownum)
{
at_partition_end= false;
@@ -1469,39 +1553,39 @@ public:
// Current row points at the first row in the partition
if (is_top_bound) // this is frame top endpoint
- item->remove();
+ remove_value_from_items();
else
- item->add();
+ add_value_to_items();
}
}
/* Move our cursor to be n_rows ahead. */
- void next_partition(ha_rows rownum, Item_sum* item)
+ void next_partition(ha_rows rownum)
{
ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
for (ha_rows i= 0; i < i_end; i++)
{
- if (next_row_intern(item))
+ if (next_row_intern())
break;
}
}
- void next_row(Item_sum* item)
+ void next_row()
{
if (at_partition_end)
return;
- next_row_intern(item);
+ next_row_intern();
}
private:
- bool next_row_intern(Item_sum *item)
+ bool next_row_intern()
{
if (!cursor.get_next())
{
if (is_top_bound) // this is frame start endpoint
- item->remove();
+ remove_value_from_items();
else
- item->add();
+ add_value_to_items();
}
else
at_partition_end= true;
@@ -1513,8 +1597,9 @@ private:
/*
Get a Frame_cursor for a frame bound. This is a "factory function".
*/
-Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
+Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool is_top_bound)
{
+ Window_frame *frame= spec->window_frame;
if (!frame)
{
/*
@@ -1536,9 +1621,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
so again the same frame bounds can be used.
*/
if (is_top_bound)
- return new Frame_unbounded_preceding;
+ return new Frame_unbounded_preceding(thd,
+ spec->partition_list,
+ spec->order_list);
else
- return new Frame_range_current_row_bottom;
+ return new Frame_range_current_row_bottom(thd,
+ spec->partition_list,
+ spec->order_list);
}
Window_frame_bound *bound= is_top_bound? frame->top_bound :
@@ -1554,9 +1643,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
{
/* The following serve both RANGE and ROWS: */
if (is_preceding)
- return new Frame_unbounded_preceding;
- else
- return new Frame_unbounded_following;
+ return new Frame_unbounded_preceding(thd,
+ spec->partition_list,
+ spec->order_list);
+
+ return new Frame_unbounded_following(thd,
+ spec->partition_list,
+ spec->order_list);
}
if (frame->units == Window_frame::UNITS_ROWS)
@@ -1567,15 +1660,21 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
DBUG_ASSERT((longlong) n_rows >= 0);
if (is_preceding)
return new Frame_n_rows_preceding(is_top_bound, n_rows);
- else
- return new Frame_n_rows_following(is_top_bound, n_rows);
+
+ return new Frame_n_rows_following(
+ thd, spec->partition_list, spec->order_list,
+ is_top_bound, n_rows);
}
else
{
if (is_top_bound)
- return new Frame_range_n_top(is_preceding, bound->offset);
- else
- return new Frame_range_n_bottom(is_preceding, bound->offset);
+ return new Frame_range_n_top(
+ thd, spec->partition_list, spec->order_list,
+ is_preceding, bound->offset);
+
+ return new Frame_range_n_bottom(thd,
+ spec->partition_list, spec->order_list,
+ is_preceding, bound->offset);
}
}
@@ -1585,67 +1684,154 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
{
if (is_top_bound)
return new Frame_rows_current_row_top;
- else
- return new Frame_rows_current_row_bottom;
+
+ return new Frame_rows_current_row_bottom;
}
else
{
if (is_top_bound)
- return new Frame_range_current_row_top;
- else
- return new Frame_range_current_row_bottom;
+ return new Frame_range_current_row_top(
+ thd, spec->partition_list, spec->order_list);
+
+ return new Frame_range_current_row_bottom(
+ thd, spec->partition_list, spec->order_list);
}
}
return NULL;
}
-void add_extra_frame_cursors(List<Frame_cursor> *cursors,
- const Item_sum *window_func)
+void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
+ Item_window_func *window_func)
{
- switch (window_func->sum_func())
+ Window_spec *spec= window_func->window_spec;
+ Item_sum *item_sum= window_func->window_func();
+ Frame_cursor *fc;
+ switch (item_sum->sum_func())
{
case Item_sum::CUME_DIST_FUNC:
- cursors->push_back(new Frame_unbounded_preceding);
- cursors->push_back(new Frame_range_current_row_bottom);
+ fc= new Frame_unbounded_preceding(thd,
+ spec->partition_list,
+ spec->order_list);
+ fc->add_sum_func(item_sum);
+ cursor_manager->add_cursor(fc);
+ fc= new Frame_range_current_row_bottom(thd,
+ spec->partition_list,
+ spec->order_list);
+ fc->add_sum_func(item_sum);
+ cursor_manager->add_cursor(fc);
break;
default:
- cursors->push_back(new Frame_unbounded_preceding);
- cursors->push_back(new Frame_rows_current_row_bottom);
+ fc= new Frame_unbounded_preceding(
+ thd, spec->partition_list, spec->order_list);
+ fc->add_sum_func(item_sum);
+ cursor_manager->add_cursor(fc);
+
+ fc= new Frame_rows_current_row_bottom;
+ fc->add_sum_func(item_sum);
+ cursor_manager->add_cursor(fc);
}
}
-void get_window_func_required_cursors(
- List<Frame_cursor> *result, const Item_window_func* item_win)
+
+/*
+ Create required frame cursors for the list of window functions.
+ Register all functions to their appropriate cursors.
+ If the window functions share the same frame specification,
+ those window functions will be registered to the same cursor.
+*/
+void get_window_functions_required_cursors(
+ THD *thd,
+ List<Item_window_func>& window_functions,
+ List<Cursor_manager> *cursor_managers)
{
- if (item_win->requires_partition_size())
- result->push_back(new Frame_unbounded_following_set_count);
+ List_iterator_fast<Item_window_func> it(window_functions);
+ Item_window_func* item_win_func;
+ Item_sum *sum_func;
+ while ((item_win_func= it++))
+ {
+ Cursor_manager *cursor_manager = new Cursor_manager();
+ sum_func = item_win_func->window_func();
+ Frame_cursor *fc;
+ /*
+ Some window functions require the partition size for computing values.
+ Add a cursor that retrieves it as the first one in the list if necessary.
+ */
+ if (item_win_func->requires_partition_size())
+ {
+ fc= new Frame_unbounded_following_set_count(thd,
+ item_win_func->window_spec->partition_list,
+ item_win_func->window_spec->order_list);
+ fc->add_sum_func(sum_func);
+ cursor_manager->add_cursor(fc);
+ }
- /*
- If it is not a regular window function that follows frame specifications,
- specific cursors are required.
- */
- if (item_win->is_frame_prohibited())
+ /*
+ If it is not a regular window function that follows frame specifications,
+ specific cursors are required. ROW_NUM, RANK, NTILE and others follow
+ such rules. Check is_frame_prohibited check for the full list.
+ */
+ if (item_win_func->is_frame_prohibited())
+ {
+ add_extra_frame_cursors(thd, cursor_manager, item_win_func);
+ cursor_managers->push_back(cursor_manager);
+ continue;
+ }
+
+ Frame_cursor *frame_bottom= get_frame_cursor(thd,
+ item_win_func->window_spec, false);
+ Frame_cursor *frame_top= get_frame_cursor(thd,
+ item_win_func->window_spec, true);
+
+ frame_bottom->add_sum_func(sum_func);
+ frame_top->add_sum_func(sum_func);
+
+ /*
+ The order of these cursors is important. A sum function
+ must first add values (via frame_bottom) then remove them via
+ frame_top. Removing items first doesn't make sense in the case of all
+ window functions.
+ */
+ cursor_manager->add_cursor(frame_bottom);
+ cursor_manager->add_cursor(frame_top);
+ cursor_managers->push_back(cursor_manager);
+ }
+}
+
+/**
+ Helper function that takes a list of window functions and writes
+ their values in the current table record.
+*/
+static
+bool save_window_function_values(List<Item_window_func>& window_functions,
+ TABLE *tbl, uchar *rowid_buf)
+{
+ List_iterator_fast<Item_window_func> iter(window_functions);
+ tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
+ store_record(tbl, record[1]);
+ while (Item_window_func *item_win= iter++)
{
- add_extra_frame_cursors(result, item_win->window_func());
- return;
+ int err;
+ item_win->save_in_field(item_win->result_field, true);
+ // TODO check if this can be placed outside the loop.
+ err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
+ if (err && err != HA_ERR_RECORD_IS_THE_SAME)
+ return true;
}
- /* A regular window function follows the frame specification. */
- result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
- false));
- result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
- true));
+ return false;
}
/*
+ TODO(cvicentiu) update this comment to reflect the new execution.
+
Streamed window function computation with window frames.
We make a single pass over the ordered temp.table, but we're using three
- cursors:
+ cursors:
- current row - the row that we're computing window func value for)
- start_bound - the start of the frame
- bottom_bound - the end of the frame
-
+
All three cursors move together.
@todo
@@ -1655,7 +1841,7 @@ void get_window_func_required_cursors(
@detail
ROWS BETWEEN 3 PRECEDING -- frame start
AND 3 FOLLOWING -- frame end
-
+
/------ frame end (aka BOTTOM)
Dataset start |
--------====*=======[*]========*========-------->> dataset end
@@ -1663,7 +1849,7 @@ void get_window_func_required_cursors(
| +-------- current row
|
\-------- frame start ("TOP")
-
+
- frame_end moves forward and adds rows into the aggregate function.
- frame_start follows behind and removes rows from the aggregate function.
- current_row is the row where the value of aggregate function is stored.
@@ -1672,97 +1858,90 @@ void get_window_func_required_cursors(
condition (Others can catch up by counting rows?)
*/
-
-bool compute_window_func_with_frames(Item_window_func *item_win,
- TABLE *tbl, READ_RECORD *info)
+bool compute_window_func(THD *thd,
+ List<Item_window_func>& window_functions,
+ List<Cursor_manager>& cursor_managers,
+ TABLE *tbl,
+ SORT_INFO *filesort_result)
{
- THD *thd= tbl->in_use;
- int err= 0;
+ List_iterator_fast<Item_window_func> iter_win_funcs(window_functions);
+ List_iterator_fast<Cursor_manager> iter_cursor_managers(cursor_managers);
+ uint err;
- Item_sum *sum_func= item_win->window_func();
- /* This algorithm doesn't support DISTINCT aggregator */
- sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
+ READ_RECORD info;
- List<Frame_cursor> cursors;
- get_window_func_required_cursors(&cursors, item_win);
+ if (init_read_record(&info, current_thd, tbl, NULL/*select*/, filesort_result,
+ 0, 1, FALSE))
+ return true;
- List_iterator_fast<Frame_cursor> it(cursors);
- Frame_cursor *c;
- while((c= it++))
+ Cursor_manager *cursor_manager;
+ while ((cursor_manager= iter_cursor_managers++))
+ cursor_manager->initialize_cursors(&info);
+
+ /* One partition tracker for each window function. */
+ List<Group_bound_tracker> partition_trackers;
+ Item_window_func *win_func;
+ while ((win_func= iter_win_funcs++))
{
- c->init(thd, info, item_win->window_spec->partition_list,
- item_win->window_spec->order_list);
+ Group_bound_tracker *tracker= new Group_bound_tracker(thd,
+ win_func->window_spec->partition_list);
+ // TODO(cvicentiu) This should be removed and placed in constructor.
+ tracker->init();
+ partition_trackers.push_back(tracker);
}
- bool is_error= false;
+ List_iterator_fast<Group_bound_tracker> iter_part_trackers(partition_trackers);
ha_rows rownum= 0;
uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0));
while (true)
{
- /* Move the current_row */
- if ((err=info->read_record(info)))
- {
- break; /* End of file */
- }
- bool partition_changed= item_win->check_if_partition_changed();
+ if ((err= info.read_record(&info)))
+ break; // End of file.
+ /* Remember current row so that we can restore it before computing
+ each window function. */
tbl->file->position(tbl->record[0]);
memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
- if (partition_changed || (rownum == 0))
- {
- sum_func->clear();
- /*
- pre_XXX functions assume that tbl->record[0] contains current_row, and
- they may not change it.
- */
- it.rewind();
- while ((c= it++))
- c->pre_next_partition(rownum, sum_func);
- /*
- We move bottom_bound first, because we want rows to be added into the
- aggregate before top_bound attempts to remove them.
- */
- it.rewind();
- while ((c= it++))
- c->next_partition(rownum, sum_func);
- }
- else
- {
- /* Again, both pre_XXX function can find current_row in tbl->record[0] */
- it.rewind();
- while ((c= it++))
- c->pre_next_row(sum_func);
-
- /* These make no assumptions about tbl->record[0] and may change it */
- it.rewind();
- while ((c= it++))
- c->next_row(sum_func);
- }
- rownum++;
+ iter_win_funcs.rewind();
+ iter_part_trackers.rewind();
+ iter_cursor_managers.rewind();
- /*
- Frame cursors may have made tbl->record[0] to point to some record other
- than current_row. This applies to tbl->file's internal state, too.
- Fix this by reading the current row again.
- */
- tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
- store_record(tbl,record[1]);
- item_win->save_in_field(item_win->result_field, true);
- err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
- if (err && err != HA_ERR_RECORD_IS_THE_SAME)
+ Group_bound_tracker *tracker;
+ while ((win_func= iter_win_funcs++) &&
+ (tracker= iter_part_trackers++) &&
+ (cursor_manager= iter_cursor_managers++))
{
- is_error= true;
- break;
+ if (tracker->check_if_next_group() || (rownum == 0))
+ {
+ /* TODO(cvicentiu)
+ Clearing window functions should happen through cursors. */
+ win_func->window_func()->clear();
+ cursor_manager->notify_cursors_partition_changed(rownum);
+ }
+ else
+ {
+ cursor_manager->notify_cursors_next_row();
+ }
+ /* Return to current row after notifying cursors for each window
+ function. */
+ tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
}
+
+ /* We now have computed values for each window function. They can now
+ be saved in the current row. */
+ save_window_function_values(window_functions, tbl, rowid_buf);
+
+ rownum++;
}
my_free(rowid_buf);
- cursors.delete_elements();
- return is_error? true: false;
-}
+ partition_trackers.delete_elements();
+ end_read_record(&info);
+ return false;
+}
/* Make a list that is a concation of two lists of ORDER elements */
@@ -1799,25 +1978,18 @@ static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2)
return res;
}
-
-bool Window_func_runner::setup(THD *thd)
+bool Window_func_runner::add_function_to_run(Item_window_func *win_func)
{
- win_func->setup_partition_border_check(thd);
+
+ Item_sum *sum_func= win_func->window_func();
+ sum_func->setup_window_func(current_thd, win_func->window_spec);
Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
- switch (type)
+ switch (type)
{
case Item_sum::ROW_NUMBER_FUNC:
case Item_sum::RANK_FUNC:
case Item_sum::DENSE_RANK_FUNC:
- {
- /*
- One-pass window function computation, walk through the rows and
- assign values.
- */
- compute_func= compute_window_func_values;
- break;
- }
case Item_sum::COUNT_FUNC:
case Item_sum::SUM_BIT_FUNC:
case Item_sum::SUM_FUNC:
@@ -1825,43 +1997,49 @@ bool Window_func_runner::setup(THD *thd)
case Item_sum::PERCENT_RANK_FUNC:
case Item_sum::CUME_DIST_FUNC:
case Item_sum::NTILE_FUNC:
- {
- /*
- Frame-aware window function computation. It does one pass, but
- uses three cursors -frame_start, current_row, and frame_end.
- */
- compute_func= compute_window_func_with_frames;
break;
- }
+
default:
- my_error(ER_NOT_SUPPORTED_YET, MYF(0), "This aggregate as window function");
+ my_error(ER_NOT_SUPPORTED_YET, MYF(0),
+ "This aggregate as window function");
return true;
}
- return false;
+ return window_functions.push_back(win_func);
}
/*
Compute the value of window function for all rows.
*/
-bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result)
+bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result)
{
- THD *thd= current_thd;
- win_func->set_phase_to_computation();
-
- /* Go through the sorted array and compute the window function */
- READ_RECORD info;
-
- if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result,
- 0, 1, FALSE))
- return true;
+ List_iterator_fast<Item_window_func> it(window_functions);
+ Item_window_func *win_func;
+ while ((win_func= it++))
+ {
+ win_func->set_phase_to_computation();
+ // TODO(cvicentiu) Setting the aggregator should probably be done during
+ // setup of Window_funcs_sort.
+ win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
+ }
+ it.rewind();
- bool is_error= compute_func(win_func, tbl, &info);
+ List<Cursor_manager> cursor_managers;
+ get_window_functions_required_cursors(thd, window_functions,
+ &cursor_managers);
- win_func->set_phase_to_retrieval();
+ /* Go through the sorted array and compute the window function */
+ bool is_error= compute_window_func(thd,
+ window_functions,
+ cursor_managers,
+ tbl, filesort_result);
+ while ((win_func= it++))
+ {
+ win_func->set_phase_to_retrieval();
+ }
- end_read_record(&info);
+ cursor_managers.delete_elements();
return is_error;
}
@@ -1872,21 +2050,15 @@ bool Window_funcs_sort::exec(JOIN *join)
THD *thd= join->thd;
JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count];
+ /* Sort the table based on the most specific sorting criteria of
+ the window functions. */
if (create_sort_index(thd, join, join_tab, filesort))
return true;
TABLE *tbl= join_tab->table;
SORT_INFO *filesort_result= join_tab->filesort_result;
- bool is_error= false;
- List_iterator<Window_func_runner> it(runners);
- Window_func_runner *runner;
-
- while ((runner= it++))
- {
- if ((is_error= runner->exec(tbl, filesort_result)))
- break;
- }
+ bool is_error= runner.exec(thd, tbl, filesort_result);
delete join_tab->filesort_result;
join_tab->filesort_result= NULL;
@@ -1894,30 +2066,32 @@ bool Window_funcs_sort::exec(JOIN *join)
}
-bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
- List_iterator<Item_window_func> &it)
+bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
+ List_iterator<Item_window_func> &it)
{
Item_window_func *win_func= it.peek();
Item_window_func *prev_win_func;
+ /* The iterator should point to a valid function at the start of execution. */
+ DBUG_ASSERT(win_func);
do
{
- Window_func_runner *runner;
- if (!(runner= new Window_func_runner(win_func)) ||
- runner->setup(thd))
- {
+ if (runner.add_function_to_run(win_func))
return true;
- }
- runners.push_back(runner);
it++;
prev_win_func= win_func;
- } while ((win_func= it.peek()) && !(win_func->marker & SORTORDER_CHANGE_FLAG));
-
+ } while ((win_func= it.peek()) &&
+ !(win_func->marker & SORTORDER_CHANGE_FLAG));
+
/*
The sort criteria must be taken from the last win_func in the group of
- adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG.
+ adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is
+ because the sort order must be the most specific sorting criteria defined
+ within the window function group. This ensures that we sort the table
+ in a way that the result is valid for all window functions belonging to
+ this Window_funcs_sort.
*/
- Window_spec *spec = prev_win_func->window_spec;
+ Window_spec *spec= prev_win_func->window_spec;
ORDER* sort_order= concat_order_lists(thd->mem_root,
spec->partition_list->first,
@@ -1932,8 +2106,8 @@ bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
bool Window_funcs_computation::setup(THD *thd,
- List<Item_window_func> *window_funcs,
- JOIN_TAB *tab)
+ List<Item_window_func> *window_funcs,
+ JOIN_TAB *tab)
{
order_window_funcs_by_window_specs(window_funcs);
diff --git a/sql/sql_window.h b/sql/sql_window.h
index 54e39d827fe..c3847240e9a 100644
--- a/sql/sql_window.h
+++ b/sql/sql_window.h
@@ -154,9 +154,7 @@ int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
// Classes that make window functions computation a part of SELECT's query plan
//////////////////////////////////////////////////////////////////////////////
-typedef bool (*window_compute_func_t)(Item_window_func *item_win,
- TABLE *tbl, READ_RECORD *info);
-
+class Frame_cursor;
/*
This handles computation of one window function.
@@ -165,21 +163,17 @@ typedef bool (*window_compute_func_t)(Item_window_func *item_win,
class Window_func_runner : public Sql_alloc
{
- Item_window_func *win_func;
-
- /* The function to use for computation*/
- window_compute_func_t compute_func;
-
public:
- Window_func_runner(Item_window_func *win_func_arg) :
- win_func(win_func_arg)
- {}
+ /* Add the function to be computed during the execution pass */
+ bool add_function_to_run(Item_window_func *win_func);
- // Set things up. Create filesort structures, etc
- bool setup(THD *thd);
-
- // This sorts and runs the window function.
- bool exec(TABLE *tbl, SORT_INFO *filesort_result);
+ /* Compute and fill the fields in the table. */
+ bool exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result);
+
+private:
+ /* A list of window functions for which this Window_func_runner will compute
+ values during the execution phase. */
+ List<Item_window_func> window_functions;
};
@@ -191,21 +185,24 @@ public:
class Window_funcs_sort : public Sql_alloc
{
- List<Window_func_runner> runners;
-
- /* Window functions can be computed over this sorting */
- Filesort *filesort;
public:
bool setup(THD *thd, SQL_SELECT *sel, List_iterator<Item_window_func> &it);
bool exec(JOIN *join);
void cleanup() { delete filesort; }
friend class Window_funcs_computation;
+
+private:
+ Window_func_runner runner;
+
+ /* Window functions can be computed over this sorting */
+ Filesort *filesort;
};
struct st_join_table;
class Explain_aggr_window_funcs;
+
/*
This is a "window function computation phase": a single object of this class
takes care of computing all window functions in a SELECT.