summaryrefslogtreecommitdiff
path: root/sql/sql_select.cc
diff options
context:
space:
mode:
authorkonstantin@mysql.com <>2004-08-03 03:32:21 -0700
committerkonstantin@mysql.com <>2004-08-03 03:32:21 -0700
commitd3e520ce7e840df5f119e8fc2f5b5bb9640e53df (patch)
tree90caa835d0b6af4148caf934c5d80616cb5c25ae /sql/sql_select.cc
parent7e6bade23bf5fbaf6cdb76e2fce2f76cfc533a7a (diff)
downloadmariadb-git-d3e520ce7e840df5f119e8fc2f5b5bb9640e53df.tar.gz
Port of cursors to be pushed into 5.0 tree:
- client side part is simple and may be considered stable - server side part now just joggles with THD state to save execution state and has no additional locking wisdom. Lot's of it are to be rewritten.
Diffstat (limited to 'sql/sql_select.cc')
-rw-r--r--sql/sql_select.cc380
1 files changed, 371 insertions, 9 deletions
diff --git a/sql/sql_select.cc b/sql/sql_select.cc
index 756b5f3c017..fce77969bcf 100644
--- a/sql/sql_select.cc
+++ b/sql/sql_select.cc
@@ -1105,7 +1105,8 @@ JOIN::exec()
(zero_result_cause?zero_result_cause:"No tables used"));
else
{
- result->send_fields(fields_list,1);
+ result->send_fields(fields_list,
+ Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
if (!having || having->val_int())
{
if (do_send_rows && (procedure ? (procedure->send_row(fields_list) ||
@@ -1512,12 +1513,45 @@ JOIN::exec()
DBUG_VOID_RETURN;
}
}
+ /* XXX: When can we have here thd->net.report_error not zero? */
+ if (thd->net.report_error)
+ {
+ error= thd->net.report_error;
+ DBUG_VOID_RETURN;
+ }
curr_join->having= curr_join->tmp_having;
- thd->proc_info="Sending data";
- error= thd->net.report_error ||
- do_select(curr_join, curr_fields_list, NULL, procedure);
- thd->limit_found_rows= curr_join->send_records;
- thd->examined_row_count= curr_join->examined_rows;
+ curr_join->fields= curr_fields_list;
+ curr_join->procedure= procedure;
+
+ if (unit == &thd->lex->unit &&
+ (unit->fake_select_lex == 0 || select_lex == unit->fake_select_lex) &&
+ thd->cursor && tables != const_tables)
+ {
+ /*
+ We are here if this is JOIN::exec for the last select of the main unit
+ and the client requested to open a cursor.
+ We check that not all tables are constant because this case is not
+ handled by do_select() separately, and this case is not implemented
+ for cursors yet.
+ */
+ DBUG_ASSERT(error == 0);
+ /*
+ curr_join is used only for reusable joins - that is,
+ to perform SELECT for each outer row (like in subselects).
+ This join is main, so we know for sure that curr_join == join.
+ */
+ DBUG_ASSERT(curr_join == this);
+ /* Open cursor for the last join sweep */
+ error= thd->cursor->open(this);
+ }
+ else
+ {
+ thd->proc_info="Sending data";
+ error= do_select(curr_join, curr_fields_list, NULL, procedure);
+ thd->limit_found_rows= curr_join->send_records;
+ thd->examined_row_count= curr_join->examined_rows;
+ }
+
DBUG_VOID_RETURN;
}
@@ -1566,6 +1600,306 @@ JOIN::cleanup()
}
+/************************* Cursor ******************************************/
+
+void
+Cursor::init_from_thd(THD *thd)
+{
+ /*
+ We need to save and reset thd->mem_root, otherwise it'll be freed
+ later in mysql_parse.
+ */
+ mem_root= thd->mem_root;
+ init_sql_alloc(&thd->mem_root,
+ thd->variables.query_alloc_block_size,
+ thd->variables.query_prealloc_size);
+
+ /*
+ The same is true for open tables and lock: save tables and zero THD
+ pointers to prevent table close in close_thread_tables (This is a part
+ of the temporary solution to make cursors work with minimal changes to
+ the current source base).
+ */
+ derived_tables= thd->derived_tables;
+ open_tables= thd->open_tables;
+ lock= thd->lock;
+ query_id= thd->query_id;
+ free_list= thd->free_list;
+ reset_thd(thd);
+ /*
+ XXX: thd->locked_tables is not changed.
+ What problems can we have with it if cursor is open?
+ */
+ /*
+ TODO: grab thd->free_list here?
+ */
+}
+
+
+void
+Cursor::init_thd(THD *thd)
+{
+ thd->mem_root= mem_root;
+
+ DBUG_ASSERT(thd->derived_tables == 0);
+ thd->derived_tables= derived_tables;
+
+ DBUG_ASSERT(thd->open_tables == 0);
+ thd->open_tables= open_tables;
+
+ DBUG_ASSERT(thd->lock== 0);
+ thd->lock= lock;
+ thd->query_id= query_id;
+ thd->free_list= free_list;
+}
+
+
+void
+Cursor::reset_thd(THD *thd)
+{
+ thd->derived_tables= 0;
+ thd->open_tables= 0;
+ thd->lock= 0;
+ thd->free_list= 0;
+}
+
+
+int
+Cursor::open(JOIN *join_arg)
+{
+ join= join_arg;
+
+ THD *thd= join->thd;
+
+ /* First non-constant table */
+ JOIN_TAB *join_tab= join->join_tab + join->const_tables;
+
+ /*
+ Send fields description to the client; server_status is sent
+ in 'EOF' packet, which ends send_fields().
+ */
+ thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
+ join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS);
+ ::send_eof(thd);
+ thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
+
+ /* Prepare JOIN for reading rows. */
+
+ Next_select_func end_select= join->sort_and_group || join->procedure &&
+ join->procedure->flags & PROC_GROUP ?
+ end_send_group : end_send;
+
+ join->join_tab[join->tables-1].next_select= end_select;
+ join->send_records= 0;
+ join->fetch_limit= join->unit->offset_limit_cnt;
+
+ /* Disable JOIN CACHE as it is not working with cursors yet */
+ for (JOIN_TAB *tab= join_tab; tab != join->join_tab + join->tables - 1; ++tab)
+ {
+ if (tab->next_select == sub_select_cache)
+ tab->next_select= sub_select;
+ }
+
+ DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0);
+ DBUG_ASSERT(join_tab->not_used_in_distinct == 0);
+ /*
+ null_row is set only if row not found and it's outer join: should never
+ happen for the first table in join_tab list
+ */
+ DBUG_ASSERT(join_tab->table->null_row == 0);
+
+ return join_tab->read_first_record(join_tab);
+}
+
+
+/*
+ DESCRIPTION
+ Fetch next num_rows rows from the cursor and sent them to the client
+ PRECONDITION:
+ Cursor is open
+ RETURN VALUES:
+ -4 there are more rows, send_eof sent to the client
+ 0 no more rows, send_eof was sent to the client, cursor is closed
+ other fatal fetch error, cursor is closed (error is not reported)
+*/
+
+int
+Cursor::fetch(ulong num_rows)
+{
+ THD *thd= join->thd;
+ JOIN_TAB *join_tab= join->join_tab + join->const_tables;;
+ COND *on_expr= join_tab->on_expr;
+ COND *select_cond= join_tab->select_cond;
+ READ_RECORD *info= &join_tab->read_record;
+
+ int error= 0;
+
+ join->fetch_limit+= num_rows;
+
+ /*
+ Run while there are new rows in the first table;
+ For each row, satisfying ON and WHERE clauses (those parts of them which
+ can be evaluated early), call next_select.
+ */
+ do
+ {
+ int no_more_rows;
+
+ join->examined_rows++;
+
+ if (thd->killed) /* Aborted by user */
+ {
+ my_error(ER_SERVER_SHUTDOWN,MYF(0));
+ return -1;
+ }
+
+ if (on_expr == 0 || on_expr->val_int())
+ {
+ if (select_cond == 0 || select_cond->val_int())
+ {
+ /*
+ TODO: call table->unlock_row() to unlock row failed selection,
+ when this feature will be used.
+ */
+ error= join_tab->next_select(join, join_tab + 1, 0);
+ DBUG_ASSERT(error <= 0);
+ if (error)
+ {
+ /* real error or LIMIT/FETCH LIMIT worked */
+ if (error == -4)
+ {
+ /*
+ FETCH LIMIT, read ahead one row, and close cursor
+ if there is no more rows XXX: to be fixed to support
+ non-equi-joins!
+ */
+ if ((no_more_rows= info->read_record(info)))
+ error= no_more_rows > 0 ? -1: 0;
+ }
+ break;
+ }
+ }
+ }
+ /* read next row; break loop if there was an error */
+ if ((no_more_rows= info->read_record(info)))
+ {
+ if (no_more_rows > 0)
+ error= -1;
+ else
+ {
+ enum { END_OF_RECORDS= 1 };
+ error= join_tab->next_select(join, join_tab+1, (int) END_OF_RECORDS);
+ }
+ break;
+ }
+ }
+ while (thd->net.report_error == 0);
+
+ if (thd->net.report_error)
+ error= -1;
+
+ switch (error) {
+ /* Fetch limit worked, possibly more rows are there */
+ case -4:
+ if (thd->transaction.all.innobase_tid)
+ ha_release_temporary_latches(thd);
+ thd->server_status|= SERVER_STATUS_CURSOR_EXISTS;
+ ::send_eof(thd);
+ thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS;
+ /* save references to memory, allocated during fetch */
+ mem_root= thd->mem_root;
+ free_list= thd->free_list;
+ break;
+ /* Limit clause worked: this is the same as 'no more rows' */
+ case -3: /* LIMIT clause worked */
+ error= 0;
+ /* fallthrough */
+ case 0: /* No more rows */
+ if (thd->transaction.all.innobase_tid)
+ ha_release_temporary_latches(thd);
+ close();
+ thd->server_status|= SERVER_STATUS_LAST_ROW_SENT;
+ ::send_eof(thd);
+ thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT;
+ join= 0;
+ unit= 0;
+ free_items(thd->free_list);
+ thd->free_list= free_list= 0;
+ /*
+ Must be last, as some memory might be allocated for free purposes,
+ like in free_tmp_table() (TODO: fix this issue)
+ */
+ mem_root= thd->mem_root;
+ free_root(&mem_root, MYF(0));
+ break;
+ default:
+ close();
+ join= 0;
+ unit= 0;
+ free_items(thd->free_list);
+ thd->free_list= free_list= 0;
+ /*
+ Must be last, as some memory might be allocated for free purposes,
+ like in free_tmp_table() (TODO: fix this issue)
+ */
+ mem_root= thd->mem_root;
+ free_root(&mem_root, MYF(0));
+ break;
+ }
+ return error;
+}
+
+
+void
+Cursor::close()
+{
+ THD *thd= join->thd;
+ join->join_free(0);
+ if (unit)
+ {
+ /* In case of UNIONs JOIN is freed inside unit->cleanup() */
+ unit->cleanup();
+ }
+ else
+ {
+ join->cleanup();
+ delete join;
+ }
+ /* XXX: Another hack: closing tables used in the cursor */
+ {
+ DBUG_ASSERT(lock || open_tables || derived_tables);
+
+ TABLE *tmp_open_tables= thd->open_tables;
+ TABLE *tmp_derived_tables= thd->derived_tables;
+ MYSQL_LOCK *tmp_lock= thd->lock;
+
+ thd->open_tables= open_tables;
+ thd->derived_tables= derived_tables;
+ thd->lock= lock;
+ close_thread_tables(thd);
+
+ thd->open_tables= tmp_derived_tables;
+ thd->derived_tables= tmp_derived_tables;
+ thd->lock= tmp_lock;
+ }
+}
+
+
+Cursor::~Cursor()
+{
+ if (is_open())
+ close();
+ free_items(free_list);
+ /*
+ Must be last, as some memory might be allocated for free purposes,
+ like in free_tmp_table() (TODO: fix this issue)
+ */
+ free_root(&mem_root, MYF(0));
+}
+
+/*********************************************************************/
+
+
int
mysql_select(THD *thd, Item ***rref_pointer_array,
TABLE_LIST *tables, uint wild_num, List<Item> &fields,
@@ -1637,6 +1971,16 @@ mysql_select(THD *thd, Item ***rref_pointer_array,
join->exec();
+ if (thd->cursor && thd->cursor->is_open())
+ {
+ /*
+ A cursor was opened for the last sweep in exec().
+ We are here only if this is mysql_select for top-level SELECT_LEX_UNIT
+ and there were no error.
+ */
+ free_join= 0;
+ }
+
if (thd->lex->describe & DESCRIBE_EXTENDED)
{
select_lex->where= join->conds_history;
@@ -5310,7 +5654,8 @@ return_zero_rows(JOIN *join, select_result *result,TABLE_LIST *tables,
if (having && having->val_int() == 0)
send_row=0;
}
- if (!(result->send_fields(fields,1)))
+ if (!(result->send_fields(fields,
+ Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)))
{
if (send_row)
{
@@ -7035,7 +7380,7 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
{
int error= 0;
JOIN_TAB *join_tab;
- int (*end_select)(JOIN *, struct st_join_table *,bool);
+ Next_select_func end_select;
DBUG_ENTER("do_select");
join->procedure=procedure;
@@ -7043,7 +7388,8 @@ do_select(JOIN *join,List<Item> *fields,TABLE *table,Procedure *procedure)
Tell the client how many fields there are in a row
*/
if (!table)
- join->result->send_fields(*fields,1);
+ join->result->send_fields(*fields,
+ Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF);
else
{
VOID(table->file->extra(HA_EXTRA_WRITE_CACHE));
@@ -8076,6 +8422,14 @@ end_send(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
}
DBUG_RETURN(-3); // Abort nicely
}
+ else if (join->send_records >= join->fetch_limit)
+ {
+ /*
+ There is a server side cursor and all rows for
+ this fetch request are sent.
+ */
+ DBUG_RETURN(-4);
+ }
}
else
{
@@ -8150,6 +8504,14 @@ end_send_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
join->do_send_rows=0;
join->unit->select_limit_cnt = HA_POS_ERROR;
}
+ else if (join->send_records >= join->fetch_limit)
+ {
+ /*
+ There is a server side cursor and all rows
+ for this fetch request are sent.
+ */
+ DBUG_RETURN(-4);
+ }
}
}
else