diff options
author | konstantin@mysql.com <> | 2004-08-03 03:32:21 -0700 |
---|---|---|
committer | konstantin@mysql.com <> | 2004-08-03 03:32:21 -0700 |
commit | d3e520ce7e840df5f119e8fc2f5b5bb9640e53df (patch) | |
tree | 90caa835d0b6af4148caf934c5d80616cb5c25ae /sql/sql_select.cc | |
parent | 7e6bade23bf5fbaf6cdb76e2fce2f76cfc533a7a (diff) | |
download | mariadb-git-d3e520ce7e840df5f119e8fc2f5b5bb9640e53df.tar.gz |
Port of cursors to be pushed into 5.0 tree:
- client side part is simple and may be considered stable
- server side part now just joggles with THD state to save execution
state and has no additional locking wisdom.
Lot's of it are to be rewritten.
Diffstat (limited to 'sql/sql_select.cc')
-rw-r--r-- | sql/sql_select.cc | 380 |
1 files changed, 371 insertions, 9 deletions
diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 756b5f3c017..fce77969bcf 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -1105,7 +1105,8 @@ JOIN::exec() (zero_result_cause?zero_result_cause:"No tables used")); else { - result->send_fields(fields_list,1); + result->send_fields(fields_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); if (!having || having->val_int()) { if (do_send_rows && (procedure ? (procedure->send_row(fields_list) || @@ -1512,12 +1513,45 @@ JOIN::exec() DBUG_VOID_RETURN; } } + /* XXX: When can we have here thd->net.report_error not zero? */ + if (thd->net.report_error) + { + error= thd->net.report_error; + DBUG_VOID_RETURN; + } curr_join->having= curr_join->tmp_having; - thd->proc_info="Sending data"; - error= thd->net.report_error || - do_select(curr_join, curr_fields_list, NULL, procedure); - thd->limit_found_rows= curr_join->send_records; - thd->examined_row_count= curr_join->examined_rows; + curr_join->fields= curr_fields_list; + curr_join->procedure= procedure; + + if (unit == &thd->lex->unit && + (unit->fake_select_lex == 0 || select_lex == unit->fake_select_lex) && + thd->cursor && tables != const_tables) + { + /* + We are here if this is JOIN::exec for the last select of the main unit + and the client requested to open a cursor. + We check that not all tables are constant because this case is not + handled by do_select() separately, and this case is not implemented + for cursors yet. + */ + DBUG_ASSERT(error == 0); + /* + curr_join is used only for reusable joins - that is, + to perform SELECT for each outer row (like in subselects). + This join is main, so we know for sure that curr_join == join. + */ + DBUG_ASSERT(curr_join == this); + /* Open cursor for the last join sweep */ + error= thd->cursor->open(this); + } + else + { + thd->proc_info="Sending data"; + error= do_select(curr_join, curr_fields_list, NULL, procedure); + thd->limit_found_rows= curr_join->send_records; + thd->examined_row_count= curr_join->examined_rows; + } + DBUG_VOID_RETURN; } @@ -1566,6 +1600,306 @@ JOIN::cleanup() } +/************************* Cursor ******************************************/ + +void +Cursor::init_from_thd(THD *thd) +{ + /* + We need to save and reset thd->mem_root, otherwise it'll be freed + later in mysql_parse. + */ + mem_root= thd->mem_root; + init_sql_alloc(&thd->mem_root, + thd->variables.query_alloc_block_size, + thd->variables.query_prealloc_size); + + /* + The same is true for open tables and lock: save tables and zero THD + pointers to prevent table close in close_thread_tables (This is a part + of the temporary solution to make cursors work with minimal changes to + the current source base). + */ + derived_tables= thd->derived_tables; + open_tables= thd->open_tables; + lock= thd->lock; + query_id= thd->query_id; + free_list= thd->free_list; + reset_thd(thd); + /* + XXX: thd->locked_tables is not changed. + What problems can we have with it if cursor is open? + */ + /* + TODO: grab thd->free_list here? + */ +} + + +void +Cursor::init_thd(THD *thd) +{ + thd->mem_root= mem_root; + + DBUG_ASSERT(thd->derived_tables == 0); + thd->derived_tables= derived_tables; + + DBUG_ASSERT(thd->open_tables == 0); + thd->open_tables= open_tables; + + DBUG_ASSERT(thd->lock== 0); + thd->lock= lock; + thd->query_id= query_id; + thd->free_list= free_list; +} + + +void +Cursor::reset_thd(THD *thd) +{ + thd->derived_tables= 0; + thd->open_tables= 0; + thd->lock= 0; + thd->free_list= 0; +} + + +int +Cursor::open(JOIN *join_arg) +{ + join= join_arg; + + THD *thd= join->thd; + + /* First non-constant table */ + JOIN_TAB *join_tab= join->join_tab + join->const_tables; + + /* + Send fields description to the client; server_status is sent + in 'EOF' packet, which ends send_fields(). + */ + thd->server_status|= SERVER_STATUS_CURSOR_EXISTS; + join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS); + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS; + + /* Prepare JOIN for reading rows. */ + + Next_select_func end_select= join->sort_and_group || join->procedure && + join->procedure->flags & PROC_GROUP ? + end_send_group : end_send; + + join->join_tab[join->tables-1].next_select= end_select; + join->send_records= 0; + join->fetch_limit= join->unit->offset_limit_cnt; + + /* Disable JOIN CACHE as it is not working with cursors yet */ + for (JOIN_TAB *tab= join_tab; tab != join->join_tab + join->tables - 1; ++tab) + { + if (tab->next_select == sub_select_cache) + tab->next_select= sub_select; + } + + DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0); + DBUG_ASSERT(join_tab->not_used_in_distinct == 0); + /* + null_row is set only if row not found and it's outer join: should never + happen for the first table in join_tab list + */ + DBUG_ASSERT(join_tab->table->null_row == 0); + + return join_tab->read_first_record(join_tab); +} + + +/* + DESCRIPTION + Fetch next num_rows rows from the cursor and sent them to the client + PRECONDITION: + Cursor is open + RETURN VALUES: + -4 there are more rows, send_eof sent to the client + 0 no more rows, send_eof was sent to the client, cursor is closed + other fatal fetch error, cursor is closed (error is not reported) +*/ + +int +Cursor::fetch(ulong num_rows) +{ + THD *thd= join->thd; + JOIN_TAB *join_tab= join->join_tab + join->const_tables;; + COND *on_expr= join_tab->on_expr; + COND *select_cond= join_tab->select_cond; + READ_RECORD *info= &join_tab->read_record; + + int error= 0; + + join->fetch_limit+= num_rows; + + /* + Run while there are new rows in the first table; + For each row, satisfying ON and WHERE clauses (those parts of them which + can be evaluated early), call next_select. + */ + do + { + int no_more_rows; + + join->examined_rows++; + + if (thd->killed) /* Aborted by user */ + { + my_error(ER_SERVER_SHUTDOWN,MYF(0)); + return -1; + } + + if (on_expr == 0 || on_expr->val_int()) + { + if (select_cond == 0 || select_cond->val_int()) + { + /* + TODO: call table->unlock_row() to unlock row failed selection, + when this feature will be used. + */ + error= join_tab->next_select(join, join_tab + 1, 0); + DBUG_ASSERT(error <= 0); + if (error) + { + /* real error or LIMIT/FETCH LIMIT worked */ + if (error == -4) + { + /* + FETCH LIMIT, read ahead one row, and close cursor + if there is no more rows XXX: to be fixed to support + non-equi-joins! + */ + if ((no_more_rows= info->read_record(info))) + error= no_more_rows > 0 ? -1: 0; + } + break; + } + } + } + /* read next row; break loop if there was an error */ + if ((no_more_rows= info->read_record(info))) + { + if (no_more_rows > 0) + error= -1; + else + { + enum { END_OF_RECORDS= 1 }; + error= join_tab->next_select(join, join_tab+1, (int) END_OF_RECORDS); + } + break; + } + } + while (thd->net.report_error == 0); + + if (thd->net.report_error) + error= -1; + + switch (error) { + /* Fetch limit worked, possibly more rows are there */ + case -4: + if (thd->transaction.all.innobase_tid) + ha_release_temporary_latches(thd); + thd->server_status|= SERVER_STATUS_CURSOR_EXISTS; + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS; + /* save references to memory, allocated during fetch */ + mem_root= thd->mem_root; + free_list= thd->free_list; + break; + /* Limit clause worked: this is the same as 'no more rows' */ + case -3: /* LIMIT clause worked */ + error= 0; + /* fallthrough */ + case 0: /* No more rows */ + if (thd->transaction.all.innobase_tid) + ha_release_temporary_latches(thd); + close(); + thd->server_status|= SERVER_STATUS_LAST_ROW_SENT; + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT; + join= 0; + unit= 0; + free_items(thd->free_list); + thd->free_list= free_list= 0; + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + mem_root= thd->mem_root; + free_root(&mem_root, MYF(0)); + break; + default: + close(); + join= 0; + unit= 0; + free_items(thd->free_list); + thd->free_list= free_list= 0; + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + mem_root= thd->mem_root; + free_root(&mem_root, MYF(0)); + break; + } + return error; +} + + +void +Cursor::close() +{ + THD *thd= join->thd; + join->join_free(0); + if (unit) + { + /* In case of UNIONs JOIN is freed inside unit->cleanup() */ + unit->cleanup(); + } + else + { + join->cleanup(); + delete join; + } + /* XXX: Another hack: closing tables used in the cursor */ + { + DBUG_ASSERT(lock || open_tables || derived_tables); + + TABLE *tmp_open_tables= thd->open_tables; + TABLE *tmp_derived_tables= thd->derived_tables; + MYSQL_LOCK *tmp_lock= thd->lock; + + thd->open_tables= open_tables; + thd->derived_tables= derived_tables; + thd->lock= lock; + close_thread_tables(thd); + + thd->open_tables= tmp_derived_tables; + thd->derived_tables= tmp_derived_tables; + thd->lock= tmp_lock; + } +} + + +Cursor::~Cursor() +{ + if (is_open()) + close(); + free_items(free_list); + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + free_root(&mem_root, MYF(0)); +} + +/*********************************************************************/ + + int mysql_select(THD *thd, Item ***rref_pointer_array, TABLE_LIST *tables, uint wild_num, List<Item> &fields, @@ -1637,6 +1971,16 @@ mysql_select(THD *thd, Item ***rref_pointer_array, join->exec(); + if (thd->cursor && thd->cursor->is_open()) + { + /* + A cursor was opened for the last sweep in exec(). + We are here only if this is mysql_select for top-level SELECT_LEX_UNIT + and there were no error. + */ + free_join= 0; + } + if (thd->lex->describe & DESCRIBE_EXTENDED) { select_lex->where= join->conds_history; @@ -5310,7 +5654,8 @@ return_zero_rows(JOIN *join, select_result *result,TABLE_LIST *tables, if (having && having->val_int() == 0) send_row=0; } - if (!(result->send_fields(fields,1))) + if (!(result->send_fields(fields, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))) { if (send_row) { @@ -7035,7 +7380,7 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure) { int error= 0; JOIN_TAB *join_tab; - int (*end_select)(JOIN *, struct st_join_table *,bool); + Next_select_func end_select; DBUG_ENTER("do_select"); join->procedure=procedure; @@ -7043,7 +7388,8 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure) Tell the client how many fields there are in a row */ if (!table) - join->result->send_fields(*fields,1); + join->result->send_fields(*fields, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); else { VOID(table->file->extra(HA_EXTRA_WRITE_CACHE)); @@ -8076,6 +8422,14 @@ end_send(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), } DBUG_RETURN(-3); // Abort nicely } + else if (join->send_records >= join->fetch_limit) + { + /* + There is a server side cursor and all rows for + this fetch request are sent. + */ + DBUG_RETURN(-4); + } } else { @@ -8150,6 +8504,14 @@ end_send_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), join->do_send_rows=0; join->unit->select_limit_cnt = HA_POS_ERROR; } + else if (join->send_records >= join->fetch_limit) + { + /* + There is a server side cursor and all rows + for this fetch request are sent. + */ + DBUG_RETURN(-4); + } } } else |