diff options
author | Oleksandr Byelkin <sanja@mariadb.com> | 2016-11-29 21:49:25 +0100 |
---|---|---|
committer | Oleksandr Byelkin <sanja@mariadb.com> | 2017-03-16 09:10:28 +0100 |
commit | 714ca49a8b29cb23cb6520574b0d09dd683e5ef6 (patch) | |
tree | b582d673c1588bdd0b1bbb09e6f8a4155e5d8592 | |
parent | aad15eae89e9700d4c1ed4c83a68f8c7b6775a27 (diff) | |
download | mariadb-git-714ca49a8b29cb23cb6520574b0d09dd683e5ef6.tar.gz |
MDEV-11419: Report all INSERT ID for bulk operation INSERTbb-10.2-MDEV-11419
Send all Insert IDs of the buld operation to client (JDBC need it)
-rw-r--r-- | include/mysql.h.pp | 9 | ||||
-rw-r--r-- | include/mysql_com.h | 10 | ||||
-rw-r--r-- | sql/protocol.cc | 22 | ||||
-rw-r--r-- | sql/protocol.h | 6 | ||||
-rw-r--r-- | sql/sql_class.cc | 116 | ||||
-rw-r--r-- | sql/sql_class.h | 8 | ||||
-rw-r--r-- | sql/sql_insert.cc | 11 | ||||
-rw-r--r-- | sql/sql_prepare.cc | 22 |
8 files changed, 191 insertions, 13 deletions
diff --git a/include/mysql.h.pp b/include/mysql.h.pp index 517516aeb30..9c1fbd85d90 100644 --- a/include/mysql.h.pp +++ b/include/mysql.h.pp @@ -89,7 +89,14 @@ enum enum_cursor_type CURSOR_TYPE_NO_CURSOR= 0, CURSOR_TYPE_READ_ONLY= 1, CURSOR_TYPE_FOR_UPDATE= 2, - CURSOR_TYPE_SCROLLABLE= 4 + CURSOR_TYPE_SCROLLABLE= 4, +}; +enum stmt_flags_type +{ + STMTFLG_CURSOR_TYPE_READ_ONLY= CURSOR_TYPE_READ_ONLY, + STMTFLG_CURSOR_TYPE_FOR_UPDATE= CURSOR_TYPE_FOR_UPDATE, + STMTFLG_CURSOR_TYPE_SCROLLABLE= CURSOR_TYPE_FOR_UPDATE, + STMTFLG_INSERT_ID_REQUEST= 128 }; enum enum_mysql_set_option { diff --git a/include/mysql_com.h b/include/mysql_com.h index 30f2b5d2aa4..eb75d74b06a 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -553,7 +553,15 @@ enum enum_cursor_type CURSOR_TYPE_NO_CURSOR= 0, CURSOR_TYPE_READ_ONLY= 1, CURSOR_TYPE_FOR_UPDATE= 2, - CURSOR_TYPE_SCROLLABLE= 4 + CURSOR_TYPE_SCROLLABLE= 4, +}; +/* first values should be the same as enum_cursor_type */ +enum stmt_flags_type +{ + STMTFLG_CURSOR_TYPE_READ_ONLY= CURSOR_TYPE_READ_ONLY, + STMTFLG_CURSOR_TYPE_FOR_UPDATE= CURSOR_TYPE_FOR_UPDATE, + STMTFLG_CURSOR_TYPE_SCROLLABLE= CURSOR_TYPE_FOR_UPDATE, + STMTFLG_INSERT_ID_REQUEST= 128 }; diff --git a/sql/protocol.cc b/sql/protocol.cc index b9d9f28831e..2d6b83a3fd8 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -562,6 +562,7 @@ void Protocol::end_statement() switch (thd->get_stmt_da()->status()) { case Diagnostics_area::DA_ERROR: + thd->stop_collecting_insert_id(); /* The query failed, send error to log and abort bootstrap. */ error= send_error(thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message(), @@ -573,12 +574,21 @@ void Protocol::end_statement() break; case Diagnostics_area::DA_OK: case Diagnostics_area::DA_OK_BULK: - error= send_ok(thd->server_status, - thd->get_stmt_da()->statement_warn_count(), - thd->get_stmt_da()->affected_rows(), - thd->get_stmt_da()->last_insert_id(), - thd->get_stmt_da()->message(), - thd->get_stmt_da()->skip_flush()); + if (thd->report_collected_insert_id()) + if (thd->is_error()) + error= send_error(thd->get_stmt_da()->sql_errno(), + thd->get_stmt_da()->message(), + thd->get_stmt_da()->get_sqlstate()); + else + error= send_eof(thd->server_status, + thd->get_stmt_da()->statement_warn_count()); + else + error= send_ok(thd->server_status, + thd->get_stmt_da()->statement_warn_count(), + thd->get_stmt_da()->affected_rows(), + thd->get_stmt_da()->last_insert_id(), + thd->get_stmt_da()->message(), + thd->get_stmt_da()->skip_flush()); break; case Diagnostics_area::DA_DISABLED: break; diff --git a/sql/protocol.h b/sql/protocol.h index 6397e3dd5e6..bf74f52fa98 100644 --- a/sql/protocol.h +++ b/sql/protocol.h @@ -30,6 +30,12 @@ class Item_param; typedef struct st_mysql_field MYSQL_FIELD; typedef struct st_mysql_rows MYSQL_ROWS; +struct insert_id_desc +{ + ulonglong first_id; + ulonglong sequence; +}; + class Protocol { protected: diff --git a/sql/sql_class.cc b/sql/sql_class.cc index f042e6600e0..553aa05cc16 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1319,6 +1319,7 @@ void THD::init(void) #endif //EMBEDDED_LIBRARY apc_target.init(&LOCK_thd_data); + insert_ids= NULL; DBUG_VOID_RETURN; } @@ -7299,4 +7300,119 @@ bool Discrete_intervals_list::append(Discrete_interval *new_interval) DBUG_RETURN(0); } +bool THD::init_collecting_insert_id() +{ + if (!insert_ids) + { + void *buff; + if (!(my_multi_malloc(MYF(MY_WME), &insert_ids, sizeof(DYNAMIC_ARRAY), + &buff, sizeof(insert_id_desc) * 10, + NullS)) || + my_init_dynamic_array2(insert_ids, sizeof(insert_id_desc), + buff, 10, 100, MYF(MY_WME))) + { + if (insert_ids) + my_free(insert_ids); + insert_ids= NULL; + return TRUE; + } + collect_auto_increment_increment= variables.auto_increment_increment; + } + return FALSE; +} + +void THD::stop_collecting_insert_id() +{ + if (insert_ids) + { + delete_dynamic(insert_ids); + my_free(insert_ids); + insert_ids= NULL; + } +} + +bool THD::collect_insert_id(ulonglong id) +{ + if (insert_ids) + { + if (insert_ids->elements) + { + insert_id_desc *last= + (insert_id_desc *)dynamic_array_ptr(insert_ids, + insert_ids->elements - 1); + if (id == last->first_id) + { + return FALSE; // no new insert id + } + if (id == last->first_id + (last->sequence * + collect_auto_increment_increment)) + { + last->sequence++; + return FALSE; + } + } + insert_id_desc el; + el.first_id= id; + el.sequence= 1; + if (insert_dynamic(insert_ids, &el)) + { + return TRUE; + } + } + return FALSE; +} + + +bool THD::report_collected_insert_id() +{ + if (insert_ids) + { + List<Item> field_list; + MEM_ROOT tmp_mem_root; + Query_arena arena(&tmp_mem_root, Query_arena::STMT_INITIALIZED), backup; + + init_alloc_root(arena.mem_root, 2048, 4096, MYF(0)); + set_n_backup_active_arena(&arena, &backup); + DBUG_ASSERT(mem_root == &tmp_mem_root); + + field_list.push_back(new (mem_root) + Item_int(this, "Id", 0, MY_INT64_NUM_DECIMAL_DIGITS), + mem_root); + field_list.push_back(new (mem_root) + Item_int(this, "Len", 0, MY_INT64_NUM_DECIMAL_DIGITS), + mem_root); + field_list.push_back(new (mem_root) + Item_return_int(this, "Inc", 0, MYSQL_TYPE_LONG), + mem_root); + + if (protocol_binary.send_result_set_metadata(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + goto error; + + for (ulonglong i= 0; i < insert_ids->elements; i++) + { + insert_id_desc *last= + (insert_id_desc *)dynamic_array_ptr(insert_ids, i); + if (insert_ids->elements == 1 && last->first_id == 0 && + get_stmt_da()->affected_rows() != 1) + continue; // No insert IDs + protocol_binary.prepare_for_resend(); + protocol_binary.store_longlong(last->first_id, TRUE); + protocol_binary.store_longlong(last->sequence, TRUE); + protocol_binary.store_long(collect_auto_increment_increment); + if (protocol_binary.write()) + goto error; + } +error: + restore_active_arena(&arena, &backup); + DBUG_ASSERT(arena.mem_root == &tmp_mem_root); + // no need free Items because they was only constants + free_root(arena.mem_root, MYF(0)); + stop_collecting_insert_id(); + return TRUE; + } + return FALSE; + +} + #endif /* !defined(MYSQL_CLIENT) */ diff --git a/sql/sql_class.h b/sql/sql_class.h index 22895d7a2d8..1ed2f8e5da9 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4286,6 +4286,14 @@ public: current_linfo= 0; mysql_mutex_unlock(&LOCK_thread_count); } + + /* Data and methods for buld INSERT IDs reporting */ + DYNAMIC_ARRAY *insert_ids; + ulong collect_auto_increment_increment; + bool init_collecting_insert_id(); + bool collect_insert_id(ulonglong id); + bool report_collected_insert_id(); + void stop_collecting_insert_id(); }; inline void add_to_active_threads(THD *thd) diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 27820c16543..4773a3e32f5 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1054,6 +1054,12 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, } its.rewind(); iteration++; + + if (!error && thd->bulk_param) + { + thd->collect_insert_id(table->file->insert_id_for_cur_row); + } + } while (iteration < bulk_iterations); values_loop_end: @@ -1201,8 +1207,9 @@ values_loop_end: retval= thd->lex->explain->send_explain(thd); goto abort; } - if ((bulk_iterations * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) || - !thd->cuted_fields)) + if ((bulk_iterations * values_list.elements) == 1 && + (!(thd->variables.option_bits & OPTION_WARNINGS) || + !thd->cuted_fields)) { my_ok(thd, info.copied + info.deleted + ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index c8765f45273..923952db340 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -214,7 +214,7 @@ public: bool execute_bulk_loop(String *expanded_query, bool open_cursor, uchar *packet_arg, uchar *packet_end_arg, - ulong iterations); + ulong iterations, bool insert_id_request); bool execute_server_runnable(Server_runnable *server_runnable); my_bool set_bulk_parameters(bool reset); ulong bulk_iterations(); @@ -3089,11 +3089,20 @@ void mysqld_stmt_execute(THD *thd, char *packet_arg, uint packet_length) open_cursor= MY_TEST(flags & (ulong) CURSOR_TYPE_READ_ONLY); thd->protocol= &thd->protocol_binary; + + if (!(thd->client_capabilities & MARIADB_CLIENT_STMT_BULK_OPERATIONS)) + { + DBUG_PRINT("info", + ("There is no bulk capability so reset iteration counter")); + iterations= 0; + } if (iterations <= 1) stmt->execute_loop(&expanded_query, open_cursor, packet, packet_end); else stmt->execute_bulk_loop(&expanded_query, open_cursor, packet, packet_end, - iterations); + iterations, + MY_TEST((flags & + (ulong) STMTFLG_INSERT_ID_REQUEST))); thd->protocol= save_protocol; sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size); @@ -4170,7 +4179,8 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, bool open_cursor, uchar *packet_arg, uchar *packet_end_arg, - ulong iterations_arg) + ulong iterations_arg, + bool insert_id_request) { Reprepare_observer reprepare_observer; bool error= 0; @@ -4197,6 +4207,12 @@ Prepared_statement::execute_bulk_loop(String *expanded_query, thd->set_bulk_execution(0); return TRUE; } + if (lex->sql_command == SQLCOM_INSERT && insert_id_request && + thd->init_collecting_insert_id()) + { + thd->set_bulk_execution(0); + return TRUE; + } #ifndef EMBEDDED_LIBRARY if (setup_conversion_functions(this, &packet, packet_end, TRUE)) |