diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/mini_client.cc | 508 | ||||
-rw-r--r-- | sql/mini_client.h | 11 | ||||
-rw-r--r-- | sql/mysql_priv.h | 4 | ||||
-rw-r--r-- | sql/share/english/errmsg.txt | 9 | ||||
-rw-r--r-- | sql/slave.cc | 38 | ||||
-rw-r--r-- | sql/slave.h | 11 | ||||
-rw-r--r-- | sql/sql_base.cc | 19 | ||||
-rw-r--r-- | sql/sql_class.cc | 1 | ||||
-rw-r--r-- | sql/sql_class.h | 2 | ||||
-rw-r--r-- | sql/sql_db.cc | 89 | ||||
-rw-r--r-- | sql/sql_lex.h | 2 | ||||
-rw-r--r-- | sql/sql_parse.cc | 11 | ||||
-rw-r--r-- | sql/sql_repl.cc | 216 | ||||
-rw-r--r-- | sql/sql_repl.h | 2 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 5 |
15 files changed, 846 insertions, 82 deletions
diff --git a/sql/mini_client.cc b/sql/mini_client.cc index 38180c0c6c8..b99e63a59d7 100644 --- a/sql/mini_client.cc +++ b/sql/mini_client.cc @@ -69,9 +69,22 @@ extern "C" { // Because of SCO 3.2V4.2 } +static void mc_free_rows(MYSQL_DATA *cur); +static MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields, + my_bool default_value, + my_bool long_flag_protocol); + static void mc_end_server(MYSQL *mysql); static int mc_sock_connect(File s, const struct sockaddr *name, uint namelen, uint to); static void mc_free_old_query(MYSQL *mysql); +static int mc_send_file_to_server(MYSQL *mysql, const char *filename); +static my_ulonglong mc_net_field_length_ll(uchar **packet); +static ulong mc_net_field_length(uchar **packet); +static int mc_read_one_row(MYSQL *mysql,uint fields,MYSQL_ROW row, + ulong *lengths); +static MYSQL_DATA *mc_read_rows(MYSQL *mysql,MYSQL_FIELD *mysql_fields, + uint fields); + #define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES) @@ -824,3 +837,498 @@ mc_mysql_close(MYSQL *mysql) } DBUG_VOID_RETURN; } + +void STDCALL mc_mysql_free_result(MYSQL_RES *result) +{ + DBUG_ENTER("mc_mysql_free_result"); + DBUG_PRINT("enter",("mysql_res: %lx",result)); + if (result) + { + if (result->handle && result->handle->status == MYSQL_STATUS_USE_RESULT) + { + DBUG_PRINT("warning",("Not all rows in set were read; Ignoring rows")); + for (;;) + { + uint pkt_len; + if ((pkt_len=(uint) mc_net_safe_read(result->handle)) == packet_error) + break; + if (pkt_len == 1 && result->handle->net.read_pos[0] == 254) + break; /* End of data */ + } + result->handle->status=MYSQL_STATUS_READY; + } + mc_free_rows(result->data); + if (result->fields) + free_root(&result->field_alloc,MYF(0)); + if (result->row) + my_free((gptr) result->row,MYF(0)); + my_free((gptr) result,MYF(0)); + } + DBUG_VOID_RETURN; +} + +static void mc_free_rows(MYSQL_DATA *cur) +{ + if (cur) + { + free_root(&cur->alloc,MYF(0)); + my_free((gptr) cur,MYF(0)); + } +} + +static MYSQL_FIELD * +mc_unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields, + my_bool default_value, my_bool long_flag_protocol) +{ + MYSQL_ROWS *row; + MYSQL_FIELD *field,*result; + DBUG_ENTER("unpack_fields"); + + field=result=(MYSQL_FIELD*) alloc_root(alloc,sizeof(MYSQL_FIELD)*fields); + if (!result) + DBUG_RETURN(0); + + for (row=data->data; row ; row = row->next,field++) + { + field->table= strdup_root(alloc,(char*) row->data[0]); + field->name= strdup_root(alloc,(char*) row->data[1]); + field->length= (uint) uint3korr(row->data[2]); + field->type= (enum enum_field_types) (uchar) row->data[3][0]; + if (long_flag_protocol) + { + field->flags= uint2korr(row->data[4]); + field->decimals=(uint) (uchar) row->data[4][2]; + } + else + { + field->flags= (uint) (uchar) row->data[4][0]; + field->decimals=(uint) (uchar) row->data[4][1]; + } + if (INTERNAL_NUM_FIELD(field)) + field->flags|= NUM_FLAG; + if (default_value && row->data[5]) + field->def=strdup_root(alloc,(char*) row->data[5]); + else + field->def=0; + field->max_length= 0; + } + mc_free_rows(data); /* Free old data */ + DBUG_RETURN(result); +} + +int STDCALL +mc_mysql_send_query(MYSQL* mysql, const char* query, uint length) +{ + return mc_simple_command(mysql, COM_QUERY, query, length, 1); +} + +int STDCALL mc_mysql_read_query_result(MYSQL *mysql) +{ + uchar *pos; + ulong field_count; + MYSQL_DATA *fields; + uint length; + DBUG_ENTER("mc_mysql_read_query_result"); + + if ((length = mc_net_safe_read(mysql)) == packet_error) + DBUG_RETURN(-1); + mc_free_old_query(mysql); /* Free old result */ +get_info: + pos=(uchar*) mysql->net.read_pos; + if ((field_count= mc_net_field_length(&pos)) == 0) + { + mysql->affected_rows= mc_net_field_length_ll(&pos); + mysql->insert_id= mc_net_field_length_ll(&pos); + if (mysql->server_capabilities & CLIENT_TRANSACTIONS) + { + mysql->server_status=uint2korr(pos); pos+=2; + } + if (pos < mysql->net.read_pos+length && mc_net_field_length(&pos)) + mysql->info=(char*) pos; + DBUG_RETURN(0); + } + if (field_count == NULL_LENGTH) /* LOAD DATA LOCAL INFILE */ + { + int error=mc_send_file_to_server(mysql,(char*) pos); + if ((length=mc_net_safe_read(mysql)) == packet_error || error) + DBUG_RETURN(-1); + goto get_info; /* Get info packet */ + } + if (!(mysql->server_status & SERVER_STATUS_AUTOCOMMIT)) + mysql->server_status|= SERVER_STATUS_IN_TRANS; + + mysql->extra_info= mc_net_field_length_ll(&pos); /* Maybe number of rec */ + if (!(fields=mc_read_rows(mysql,(MYSQL_FIELD*) 0,5))) + DBUG_RETURN(-1); + if (!(mysql->fields=mc_unpack_fields(fields,&mysql->field_alloc, + (uint) field_count,0, + (my_bool) test(mysql->server_capabilities & + CLIENT_LONG_FLAG)))) + DBUG_RETURN(-1); + mysql->status=MYSQL_STATUS_GET_RESULT; + mysql->field_count=field_count; + DBUG_RETURN(0); +} + +int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length) +{ + DBUG_ENTER("mysql_real_query"); + DBUG_PRINT("enter",("handle: %lx",mysql)); + DBUG_PRINT("query",("Query = \"%s\"",query)); + if(!length) + length = strlen(query); + if (mc_simple_command(mysql,COM_QUERY,query,length,1)) + DBUG_RETURN(-1); + DBUG_RETURN(mc_mysql_read_query_result(mysql)); +} + +static int mc_send_file_to_server(MYSQL *mysql, const char *filename) +{ + int fd, readcount; + char buf[IO_SIZE*15],*tmp_name; + DBUG_ENTER("send_file_to_server"); + + fn_format(buf,filename,"","",4); /* Convert to client format */ + if (!(tmp_name=my_strdup(buf,MYF(0)))) + { + strmov(mysql->net.last_error, ER(mysql->net.last_errno=CR_OUT_OF_MEMORY)); + DBUG_RETURN(-1); + } + if ((fd = my_open(tmp_name,O_RDONLY, MYF(0))) < 0) + { + mysql->net.last_errno=EE_FILENOTFOUND; + sprintf(buf,EE(mysql->net.last_errno),tmp_name,errno); + strmake(mysql->net.last_error,buf,sizeof(mysql->net.last_error)-1); + my_net_write(&mysql->net,"",0); net_flush(&mysql->net); + my_free(tmp_name,MYF(0)); + DBUG_RETURN(-1); + } + + while ((readcount = (int) my_read(fd,buf,sizeof(buf),MYF(0))) > 0) + { + if (my_net_write(&mysql->net,buf,readcount)) + { + mysql->net.last_errno=CR_SERVER_LOST; + strmov(mysql->net.last_error,ER(mysql->net.last_errno)); + DBUG_PRINT("error",("Lost connection to MySQL server during LOAD DATA of local file")); + (void) my_close(fd,MYF(0)); + my_free(tmp_name,MYF(0)); + DBUG_RETURN(-1); + } + } + (void) my_close(fd,MYF(0)); + /* Send empty packet to mark end of file */ + if (my_net_write(&mysql->net,"",0) || net_flush(&mysql->net)) + { + mysql->net.last_errno=CR_SERVER_LOST; + sprintf(mysql->net.last_error,ER(mysql->net.last_errno),errno); + my_free(tmp_name,MYF(0)); + DBUG_RETURN(-1); + } + if (readcount < 0) + { + mysql->net.last_errno=EE_READ; /* the errmsg for not entire file read */ + sprintf(buf,EE(mysql->net.last_errno),tmp_name,errno); + strmake(mysql->net.last_error,buf,sizeof(mysql->net.last_error)-1); + my_free(tmp_name,MYF(0)); + DBUG_RETURN(-1); + } + DBUG_RETURN(0); +} + +/* Get the length of next field. Change parameter to point at fieldstart */ +static ulong mc_net_field_length(uchar **packet) +{ + reg1 uchar *pos= *packet; + if (*pos < 251) + { + (*packet)++; + return (ulong) *pos; + } + if (*pos == 251) + { + (*packet)++; + return NULL_LENGTH; + } + if (*pos == 252) + { + (*packet)+=3; + return (ulong) uint2korr(pos+1); + } + if (*pos == 253) + { + (*packet)+=4; + return (ulong) uint3korr(pos+1); + } + (*packet)+=9; /* Must be 254 when here */ + return (ulong) uint4korr(pos+1); +} + +/* Same as above, but returns ulonglong values */ + +static my_ulonglong mc_net_field_length_ll(uchar **packet) +{ + reg1 uchar *pos= *packet; + if (*pos < 251) + { + (*packet)++; + return (my_ulonglong) *pos; + } + if (*pos == 251) + { + (*packet)++; + return (my_ulonglong) NULL_LENGTH; + } + if (*pos == 252) + { + (*packet)+=3; + return (my_ulonglong) uint2korr(pos+1); + } + if (*pos == 253) + { + (*packet)+=4; + return (my_ulonglong) uint3korr(pos+1); + } + (*packet)+=9; /* Must be 254 when here */ +#ifdef NO_CLIENT_LONGLONG + return (my_ulonglong) uint4korr(pos+1); +#else + return (my_ulonglong) uint8korr(pos+1); +#endif +} + +/* Read all rows (fields or data) from server */ + +static MYSQL_DATA *mc_read_rows(MYSQL *mysql,MYSQL_FIELD *mysql_fields, + uint fields) +{ + uint field,pkt_len; + ulong len; + uchar *cp; + char *to; + MYSQL_DATA *result; + MYSQL_ROWS **prev_ptr,*cur; + NET *net = &mysql->net; + DBUG_ENTER("mc_read_rows"); + + if ((pkt_len=(uint) mc_net_safe_read(mysql)) == packet_error) + DBUG_RETURN(0); + if (!(result=(MYSQL_DATA*) my_malloc(sizeof(MYSQL_DATA), + MYF(MY_WME | MY_ZEROFILL)))) + { + net->last_errno=CR_OUT_OF_MEMORY; + strmov(net->last_error,ER(net->last_errno)); + DBUG_RETURN(0); + } + init_alloc_root(&result->alloc,8192,0); /* Assume rowlength < 8192 */ + result->alloc.min_malloc=sizeof(MYSQL_ROWS); + prev_ptr= &result->data; + result->rows=0; + result->fields=fields; + + while (*(cp=net->read_pos) != 254 || pkt_len != 1) + { + result->rows++; + if (!(cur= (MYSQL_ROWS*) alloc_root(&result->alloc, + sizeof(MYSQL_ROWS))) || + !(cur->data= ((MYSQL_ROW) + alloc_root(&result->alloc, + (fields+1)*sizeof(char *)+pkt_len)))) + { + mc_free_rows(result); + net->last_errno=CR_OUT_OF_MEMORY; + strmov(net->last_error,ER(net->last_errno)); + DBUG_RETURN(0); + } + *prev_ptr=cur; + prev_ptr= &cur->next; + to= (char*) (cur->data+fields+1); + for (field=0 ; field < fields ; field++) + { + if ((len=(ulong) mc_net_field_length(&cp)) == NULL_LENGTH) + { /* null field */ + cur->data[field] = 0; + } + else + { + cur->data[field] = to; + memcpy(to,(char*) cp,len); to[len]=0; + to+=len+1; + cp+=len; + if (mysql_fields) + { + if (mysql_fields[field].max_length < len) + mysql_fields[field].max_length=len; + } + } + } + cur->data[field]=to; /* End of last field */ + if ((pkt_len=mc_net_safe_read(mysql)) == packet_error) + { + mc_free_rows(result); + DBUG_RETURN(0); + } + } + *prev_ptr=0; /* last pointer is null */ + DBUG_PRINT("exit",("Got %d rows",result->rows)); + DBUG_RETURN(result); +} + + +/* +** Read one row. Uses packet buffer as storage for fields. +** When next packet is read, the previous field values are destroyed +*/ + + +static int mc_read_one_row(MYSQL *mysql,uint fields,MYSQL_ROW row, + ulong *lengths) +{ + uint field; + ulong pkt_len,len; + uchar *pos,*prev_pos; + + if ((pkt_len=(uint) mc_net_safe_read(mysql)) == packet_error) + return -1; + if (pkt_len == 1 && mysql->net.read_pos[0] == 254) + return 1; /* End of data */ + prev_pos= 0; /* allowed to write at packet[-1] */ + pos=mysql->net.read_pos; + for (field=0 ; field < fields ; field++) + { + if ((len=(ulong) mc_net_field_length(&pos)) == NULL_LENGTH) + { /* null field */ + row[field] = 0; + *lengths++=0; + } + else + { + row[field] = (char*) pos; + pos+=len; + *lengths++=len; + } + if (prev_pos) + *prev_pos=0; /* Terminate prev field */ + prev_pos=pos; + } + row[field]=(char*) prev_pos+1; /* End of last field */ + *prev_pos=0; /* Terminate last field */ + return 0; +} + +my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res) +{ + return res->row_count; +} + +unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res) +{ + return res->field_count; +} + +void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row) +{ + MYSQL_ROWS *tmp=0; + DBUG_PRINT("info",("mysql_data_seek(%ld)",(long) row)); + if (result->data) + for (tmp=result->data->data; row-- && tmp ; tmp = tmp->next) ; + result->current_row=0; + result->data_cursor = tmp; +} + +MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res) +{ + DBUG_ENTER("mc_mysql_fetch_row"); + if (!res->data) + { /* Unbufferred fetch */ + if (!res->eof) + { + if (!(mc_read_one_row(res->handle,res->field_count,res->row, + res->lengths))) + { + res->row_count++; + DBUG_RETURN(res->current_row=res->row); + } + else + { + DBUG_PRINT("info",("end of data")); + res->eof=1; + res->handle->status=MYSQL_STATUS_READY; + } + } + DBUG_RETURN((MYSQL_ROW) NULL); + } + { + MYSQL_ROW tmp; + if (!res->data_cursor) + { + DBUG_PRINT("info",("end of data")); + DBUG_RETURN(res->current_row=(MYSQL_ROW) NULL); + } + tmp = res->data_cursor->data; + res->data_cursor = res->data_cursor->next; + DBUG_RETURN(res->current_row=tmp); + } +} + +int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db) +{ + int error; + DBUG_ENTER("mysql_select_db"); + DBUG_PRINT("enter",("db: '%s'",db)); + + if ((error=mc_simple_command(mysql,COM_INIT_DB,db,(uint) strlen(db),0))) + DBUG_RETURN(error); + my_free(mysql->db,MYF(MY_ALLOW_ZERO_PTR)); + mysql->db=my_strdup(db,MYF(MY_WME)); + DBUG_RETURN(0); +} + + +MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql) +{ + MYSQL_RES *result; + DBUG_ENTER("mysql_store_result"); + + if (!mysql->fields) + DBUG_RETURN(0); + if (mysql->status != MYSQL_STATUS_GET_RESULT) + { + strmov(mysql->net.last_error, + ER(mysql->net.last_errno=CR_COMMANDS_OUT_OF_SYNC)); + DBUG_RETURN(0); + } + mysql->status=MYSQL_STATUS_READY; /* server is ready */ + if (!(result=(MYSQL_RES*) my_malloc(sizeof(MYSQL_RES)+ + sizeof(ulong)*mysql->field_count, + MYF(MY_WME | MY_ZEROFILL)))) + { + mysql->net.last_errno=CR_OUT_OF_MEMORY; + strmov(mysql->net.last_error, ER(mysql->net.last_errno)); + DBUG_RETURN(0); + } + result->eof=1; /* Marker for buffered */ + result->lengths=(ulong*) (result+1); + if (!(result->data=mc_read_rows(mysql,mysql->fields,mysql->field_count))) + { + my_free((gptr) result,MYF(0)); + DBUG_RETURN(0); + } + mysql->affected_rows= result->row_count= result->data->rows; + result->data_cursor= result->data->data; + result->fields= mysql->fields; + result->field_alloc= mysql->field_alloc; + result->field_count= mysql->field_count; + result->current_field=0; + result->current_row=0; /* Must do a fetch first */ + mysql->fields=0; /* fields is now in result */ + DBUG_RETURN(result); /* Data fetched */ +} + + + + + + + + diff --git a/sql/mini_client.h b/sql/mini_client.h index f7d95a1b66e..22cdb31f846 100644 --- a/sql/mini_client.h +++ b/sql/mini_client.h @@ -42,6 +42,17 @@ char * STDCALL mc_mysql_error(MYSQL *mysql); int STDCALL mc_mysql_errno(MYSQL *mysql); my_bool STDCALL mc_mysql_reconnect(MYSQL* mysql); +int STDCALL mc_mysql_send_query(MYSQL* mysql, const char* query, uint length); +int STDCALL mc_mysql_read_query_result(MYSQL *mysql); +int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length); +MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql); +void STDCALL mc_mysql_free_result(MYSQL_RES *result); +void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row); +my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res); +unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res); +MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res); +int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db); + #endif diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index d00eb09a363..c4967372bb0 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -223,7 +223,7 @@ inline THD *_current_thd(void) #include "opt_range.h" -void mysql_create_db(THD *thd, char *db, uint create_info); +int mysql_create_db(THD *thd, char *db, uint create_info); void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags); int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists); int quick_rm_table(enum db_type base,const char *db, @@ -245,7 +245,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, char* packet, uint packet_length); bool check_stack_overrun(THD *thd,char *dummy); bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables); -void mysql_rm_db(THD *thd,char *db,bool if_exists); +int mysql_rm_db(THD *thd,char *db,bool if_exists); void table_cache_init(void); void table_cache_free(void); uint cached_tables(void); diff --git a/sql/share/english/errmsg.txt b/sql/share/english/errmsg.txt index ff29fffe958..30e8894db08 100644 --- a/sql/share/english/errmsg.txt +++ b/sql/share/english/errmsg.txt @@ -153,7 +153,7 @@ "You have an error in your SQL syntax", "Delayed insert thread couldn't get requested lock for table %-.64s", "Too many delayed threads in use", -"Aborted connection %ld to db: '%-.64s' user: '%-.32s' (%-.64s)", +"Aborted connection %ld to db: '%-.64s' user: '%-.32s' (%-.64s) - see http://www.mysql.com/doc/C/o/Communication_errors.html", "Got a packet bigger than 'max_allowed_packet'", "Got a read error from the connection pipe", "Got an error from fcntl()", @@ -185,7 +185,7 @@ "Got error %d during ROLLBACK", "Got error %d during FLUSH_LOGS", "Got error %d during CHECKPOINT", -"Aborted connection %ld to db: '%-.64s' user: '%-.32s' host: `%-.64s' (%-.64s)", +"Aborted connection %ld to db: '%-.64s' user: '%-.32s' host: `%-.64s' (%-.64s) - see http://www.mysql.com/doc/C/o/Communication_errors.html", "The handler for the table does not support binary table dump", "Binlog closed, cannot RESET MASTER", "Failed rebuilding the index of dumped table '%-.64s'", @@ -206,3 +206,8 @@ "Could not create slave thread, check system resources", "User %-.64s has already more than 'max_user_connections' active connections", "You may only use constant expressions with SET", +"Error connecting to master: %-.128s", +"Error running query on master: %-.128s", + + + diff --git a/sql/slave.cc b/sql/slave.cc index 6b9c376a625..1841f3a9e4a 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -20,6 +20,7 @@ #include <myisam.h> #include "mini_client.h" #include "slave.h" +#include "sql_repl.h" #include <thr_alarm.h> #include <my_dir.h> @@ -55,7 +56,7 @@ static int init_slave_thread(THD* thd); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_sleep(THD* thd, int sec); -static int request_table_dump(MYSQL* mysql, char* db, char* table); +static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); inline char* rewrite_db(char* db); @@ -344,7 +345,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, thd->proc_info = "Creating table from master dump"; // save old db in case we are creating in a different database char* save_db = thd->db; - thd->db = thd->last_nx_db; + thd->db = (char*)db; mysql_parse(thd, thd->query, packet_len); // run create table thd->db = save_db; // leave things the way the were before @@ -400,31 +401,39 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, return error; } -int fetch_nx_table(THD* thd, MASTER_INFO* mi) +int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, + MASTER_INFO* mi, MYSQL* mysql) { - MYSQL* mysql = mc_mysql_init(NULL); int error = 1; int nx_errno = 0; - if(!mysql) + bool called_connected = (mysql != NULL); + if(!called_connected && !(mysql = mc_mysql_init(NULL))) { sql_print_error("fetch_nx_table: Error in mysql_init()"); nx_errno = ER_GET_ERRNO; goto err; } - safe_connect(thd, mysql, mi); - if(slave_killed(thd)) - goto err; - - if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table)) + if(!called_connected) + { + if(connect_to_master(thd, mysql, mi)) + { + sql_print_error("Could not connect to master while fetching table\ + '%-64s.%-64s'", db_name, table_name); + nx_errno = ER_CONNECT_TO_MASTER; + goto err; + } + } + + if(request_table_dump(mysql, db_name, table_name)) { nx_errno = ER_GET_ERRNO; sql_print_error("fetch_nx_table: failed on table dump request "); goto err; } - if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db, - thd->last_nx_table)) + if(create_table_from_dump(thd, &mysql->net, db_name, + table_name)) { // create_table_from_dump will have sent the error alread sql_print_error("fetch_nx_table: failed on create table "); @@ -434,7 +443,7 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi) error = 0; err: - if (mysql) + if (mysql && !called_connected) mc_mysql_close(mysql); if (nx_errno && thd->net.vio) send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); @@ -764,7 +773,7 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi) return 0; } -static int request_table_dump(MYSQL* mysql, char* db, char* table) +static int request_table_dump(MYSQL* mysql, const char* db, const char* table) { char buf[1024]; char * p = buf; @@ -901,7 +910,6 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) VOID(pthread_mutex_lock(&LOCK_thread_count)); thd->query_id = query_id++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); - thd->last_nx_table = thd->last_nx_db = 0; thd->query_error = 0; // clear error thd->net.last_errno = 0; thd->net.last_error[0] = 0; diff --git a/sql/slave.h b/sql/slave.h index 311368a4b82..77197246bad 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -1,6 +1,8 @@ #ifndef SLAVE_H #define SLAVE_H +#include "mysql.h" + typedef struct st_master_info { char log_file_name[FN_REFLEN]; @@ -65,11 +67,14 @@ typedef struct st_table_rule_ent int flush_master_info(MASTER_INFO* mi); -int mysql_table_dump(THD* thd, char* db, char* tbl_name, int fd = -1); +int mysql_table_dump(THD* thd, const char* db, + const char* tbl_name, int fd = -1); // if fd is -1, dump to NET -int fetch_nx_table(THD* thd, MASTER_INFO* mi); + +int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, + MASTER_INFO* mi, MYSQL* mysql); // retrieve non-exitent table from master -// the caller must set thd->last_nx_table and thd->last_nx_db first + int show_master_info(THD* thd); int show_binlog_info(THD* thd); diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 37a14f02bcc..077de271e04 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -837,25 +837,6 @@ TABLE *open_table(THD *thd,const char *db,const char *table_name, !(table->table_cache_key=memdup_root(&table->mem_root,(char*) key, key_length))) { - MEM_ROOT* glob_alloc; - LINT_INIT(glob_alloc); - - if (errno == ENOENT && - (glob_alloc = my_pthread_getspecific_ptr(MEM_ROOT*,THR_MALLOC))) - // Sasha: needed for replication - // remember the name of the non-existent table - // so we can try to download it from the master - { - int table_name_len = (uint) strlen(table_name); - int db_len = (uint) strlen(db); - thd->last_nx_db = alloc_root(glob_alloc,db_len + table_name_len + 2); - if(thd->last_nx_db) - { - thd->last_nx_table = thd->last_nx_db + db_len + 1; - memcpy(thd->last_nx_table, table_name, table_name_len + 1); - memcpy(thd->last_nx_db, db, db_len + 1); - } - } table->next=table->prev=table; free_cache_entry(table); VOID(pthread_mutex_unlock(&LOCK_open)); diff --git a/sql/sql_class.cc b/sql/sql_class.cc index ffcb15b4c9b..eedfd21e4c3 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -96,7 +96,6 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), current_linfo = 0; slave_thread = 0; slave_proxy_id = 0; - last_nx_table = last_nx_db = 0; cond_count=0; convert_set=0; mysys_var=0; diff --git a/sql/sql_class.h b/sql/sql_class.h index 603e4bdeeb9..d32fa0ed743 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -242,8 +242,6 @@ public: enum enum_server_command command; uint32 server_id; const char *where; - char* last_nx_table; // last non-existent table, we need this for replication - char* last_nx_db; // database of the last nx table time_t start_time,time_after_lock,user_time; time_t connect_time,thr_create_time; // track down slow pthread_create thr_lock_type update_lock_default; diff --git a/sql/sql_db.cc b/sql/sql_db.cc index 5243498f7fc..20eb4688de7 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -30,11 +30,12 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *path, /* db-name is already validated when we come here */ -void mysql_create_db(THD *thd, char *db, uint create_options) +int mysql_create_db(THD *thd, char *db, uint create_options) { char path[FN_REFLEN+16]; MY_DIR *dirp; long result=1; + int error = 0; DBUG_ENTER("mysql_create_db"); VOID(pthread_mutex_lock(&LOCK_mysql_create_db)); @@ -47,7 +48,9 @@ void mysql_create_db(THD *thd, char *db, uint create_options) my_dirend(dirp); if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS)) { - net_printf(&thd->net,ER_DB_CREATE_EXISTS,db); + if(thd) + net_printf(&thd->net,ER_DB_CREATE_EXISTS,db); + error = 1; goto exit; } result = 0; @@ -57,34 +60,39 @@ void mysql_create_db(THD *thd, char *db, uint create_options) strend(path)[-1]=0; // Remove last '/' from path if (my_mkdir(path,0777,MYF(0)) < 0) { - net_printf(&thd->net,ER_CANT_CREATE_DB,db,my_errno); + if(thd) + net_printf(&thd->net,ER_CANT_CREATE_DB,db,my_errno); + error = 1; goto exit; } } - if (!thd->query) - { - thd->query = path; - thd->query_length = (uint) (strxmov(path,"create database ", db, NullS)- - path); - } + + if(thd) { - mysql_update_log.write(thd,thd->query, thd->query_length); - if (mysql_bin_log.is_open()) + if (!thd->query) { - Query_log_event qinfo(thd, thd->query); - mysql_bin_log.write(&qinfo); + thd->query = path; + thd->query_length = (uint) (strxmov(path,"create database ", db, NullS)- + path); } + { + mysql_update_log.write(thd,thd->query, thd->query_length); + if (mysql_bin_log.is_open()) + { + Query_log_event qinfo(thd, thd->query); + mysql_bin_log.write(&qinfo); + } + } + if (thd->query == path) + { + thd->query = 0; // just in case + thd->query_length = 0; + } + send_ok(&thd->net, result); } - if (thd->query == path) - { - thd->query = 0; // just in case - thd->query_length = 0; - } - send_ok(&thd->net, result); - exit: VOID(pthread_mutex_unlock(&LOCK_mysql_create_db)); - DBUG_VOID_RETURN; + DBUG_RETURN(error); } const char *del_exts[]= @@ -94,10 +102,14 @@ static TYPELIB deletable_extentions= /* db-name is already validated when we come here */ - -void mysql_rm_db(THD *thd,char *db,bool if_exists) +/* If thd == 0, do not write any messages + This is useful in replication when we want to remove + a stale database before replacing it with the new one +*/ +int mysql_rm_db(THD *thd,char *db,bool if_exists) { long deleted=0; + int error = 0; char path[FN_REFLEN+16]; MY_DIR *dirp; DBUG_ENTER("mysql_rm_db"); @@ -110,15 +122,19 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists) /* See if the directory exists */ if (!(dirp = my_dir(path,MYF(MY_WME | MY_DONT_SORT)))) { - if (!if_exists) - net_printf(&thd->net,ER_DB_DROP_EXISTS,db); - else - send_ok(&thd->net,0); + if(thd) + { + if (!if_exists) + net_printf(&thd->net,ER_DB_DROP_EXISTS,db); + else + send_ok(&thd->net,0); + } + error = !if_exists; goto exit; } remove_db_from_cache(db); - if ((deleted=mysql_rm_known_files(thd, dirp, path,0)) >= 0) + if ((deleted=mysql_rm_known_files(thd, dirp, path,0)) >= 0 && thd) { if (!thd->query) { @@ -137,13 +153,14 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists) thd->query = 0; // just in case thd->query_length = 0; } + send_ok(&thd->net,(ulong) deleted); } exit: VOID(pthread_mutex_unlock(&LOCK_open)); VOID(pthread_mutex_unlock(&LOCK_mysql_create_db)); - DBUG_VOID_RETURN; + DBUG_RETURN(error); } /* @@ -151,6 +168,7 @@ exit: are 2 digits (raid directories). */ +/* This one also needs to work with thd == 0 for replication */ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path, uint level) { @@ -162,7 +180,7 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path, /* remove all files with known extensions */ for (uint idx=2 ; - idx < (uint) dirp->number_off_files && !thd->killed ; + idx < (uint) dirp->number_off_files && (!thd || !thd->killed) ; idx++) { FILEINFO *file=dirp->dir_entry+idx; @@ -196,7 +214,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path, unpack_filename(filePath,filePath); if (my_delete(filePath,MYF(MY_WME))) { - net_printf(&thd->net,ER_DB_DROP_DELETE,filePath,my_error); + if(thd) + net_printf(&thd->net,ER_DB_DROP_DELETE,filePath,my_error); my_dirend(dirp); DBUG_RETURN(-1); } @@ -205,7 +224,7 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path, my_dirend(dirp); - if (thd->killed) + if (thd && thd->killed) { send_error(&thd->net,ER_SERVER_SHUTDOWN); DBUG_RETURN(-1); @@ -229,7 +248,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path, /* Don't give errors if we can't delete 'RAID' directory */ if (level) DBUG_RETURN(deleted); - send_error(&thd->net); + if(thd) + send_error(&thd->net); DBUG_RETURN(-1); } path=filePath; @@ -242,7 +262,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path, /* Don't give errors if we can't delete 'RAID' directory */ if (rmdir(path) < 0 && !level) { - net_printf(&thd->net,ER_DB_DROP_RMDIR, path,errno); + if(thd) + net_printf(&thd->net,ER_DB_DROP_RMDIR, path,errno); DBUG_RETURN(-1); } } diff --git a/sql/sql_lex.h b/sql/sql_lex.h index 0df5bbebc37..8c83e6e587a 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -53,7 +53,7 @@ enum enum_sql_command { SQLCOM_BEGIN, SQLCOM_LOAD_MASTER_TABLE, SQLCOM_CHANGE_MASTER, SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE, SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS, - SQLCOM_SHOW_OPEN_TABLES, + SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA, SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ }; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 3dda8d1cff7..7742df4a2bf 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1203,6 +1203,13 @@ mysql_execute_command(void) res = show_binlog_info(thd); break; } + + case SQLCOM_LOAD_MASTER_DATA: // sync with master + if(check_process_priv(thd)) + goto error; + res = load_master_data(thd); + break; + case SQLCOM_LOAD_MASTER_TABLE: if (!tables->db) @@ -1226,9 +1233,7 @@ mysql_execute_command(void) break; } - thd->last_nx_table = tables->real_name; - thd->last_nx_db = tables->db; - if(fetch_nx_table(thd, &glob_mi)) + if(fetch_nx_table(thd, tables->db, tables->real_name, &glob_mi, 0)) // fetch_nx_table is responsible for sending // the error { diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index e5039d118be..d36fa1a3534 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -21,6 +21,7 @@ #include "sql_repl.h" #include "sql_acl.h" #include "log_event.h" +#include "mini_client.h" #include <thr_alarm.h> #include <my_dir.h> @@ -845,5 +846,220 @@ err: return 1; } +int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi) +{ + if(!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + sql_print_error("Connection to master failed: %s", + mc_mysql_error(mysql)); + return 1; + } + return 0; +} + +static inline void cleanup_mysql_results(MYSQL_RES* db_res, + MYSQL_RES** cur, MYSQL_RES** start) +{ + for( ; cur >= start; --cur) + if(*cur) + mc_mysql_free_result(*cur); + mc_mysql_free_result(db_res); +} + +static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db, + MYSQL_RES* table_res) +{ + MYSQL_ROW row; + + for( row = mc_mysql_fetch_row(table_res); row; + row = mc_mysql_fetch_row(table_res)) + { + TABLE_LIST table; + const char* table_name = row[0]; + int error; + if(table_rules_on) + { + table.next = 0; + table.db = (char*)db; + table.real_name = (char*)table_name; + if(!tables_ok(thd, &table)) + continue; + } + + if((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql))) + return error; + } + + return 0; +} + +int load_master_data(THD* thd) +{ + MYSQL mysql; + MYSQL_RES* master_status_res = 0; + bool slave_was_running = 0; + int error = 0; + + mc_mysql_init(&mysql); + + pthread_mutex_lock(&LOCK_slave); + // we do not want anyone messing with the slave at all for the entire + // duration of the data load; + + // first, kill the slave + if((slave_was_running = slave_running)) + { + abort_slave = 1; + thr_alarm_kill(slave_real_id); + thd->proc_info = "waiting for slave to die"; + while(slave_running) + pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done + } + + + if(connect_to_master(thd, &mysql, &glob_mi)) + { + net_printf(&thd->net, error = ER_CONNECT_TO_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + // now that we are connected, get all database and tables in each + { + MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res; + uint num_dbs; + MYSQL_ROW row; + + if(mc_mysql_query(&mysql, "show databases", 0) || + !(db_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + if(!(num_dbs = mc_mysql_num_rows(db_res))) + goto err; + // in theory, the master could have no databases at all + // and run with skip-grant + + if(!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) + { + net_printf(&thd->net, error = ER_OUTOFMEMORY); + goto err; + } + + // this is a temporary solution until we have online backup + // capabilities - to be replaced once online backup is working + // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we + // can to minimize the lock time + if(mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) + || mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) || + !(master_status_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + + // go through every table in every database, and if the replication + // rules allow replicating it, get it + + table_res_end = table_res + num_dbs; + + for(cur_table_res = table_res; cur_table_res < table_res_end; + ++cur_table_res) + { + MYSQL_ROW row = mc_mysql_fetch_row(db_res); + // since we know how many rows we have, this can never be NULL + + char* db = row[0]; + int drop_error = 0; + + // do not replicate databases excluded by rules + // also skip mysql database - in most cases the user will + // mess up and not exclude mysql database with the rules when + // he actually means to - in this case, he is up for a surprise if + // his priv tables get dropped and downloaded from master + // TO DO - add special option, not enabled + // by default, to allow inclusion of mysql database into load + // data from master + if(!db_ok(db, replicate_do_db, replicate_ignore_db) || + !strcmp(db,"mysql")) + { + *cur_table_res = 0; + continue; + } + + if((drop_error = mysql_rm_db(0, db, 1)) || + mysql_create_db(0, db, 0)) + { + error = (drop_error) ? ER_DB_DROP_DELETE : ER_CANT_CREATE_DB; + net_printf(&thd->net, error, db, my_error); + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + goto err; + } + + if(mc_mysql_select_db(&mysql, db) || + mc_mysql_query(&mysql, "show tables", 0) || + !(*cur_table_res = mc_mysql_store_result(&mysql))) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + goto err; + } + + if((error = fetch_db_tables(thd, &mysql, db, *cur_table_res))) + { + // we do not report the error - fetch_db_tables handles it + cleanup_mysql_results(db_res, cur_table_res, table_res); + goto err; + } + } + + cleanup_mysql_results(db_res, cur_table_res - 1, table_res); + + // adjust position in the master + if(master_status_res) + { + MYSQL_ROW row = mc_mysql_fetch_row(master_status_res); + + // we need this check because the master may not be running with + // log-bin, but it will still allow us to do all the steps + // of LOAD DATA FROM MASTER - no reason to forbid it, really, + // although it does not make much sense for the user to do it + if(row[0] && row[1]) + { + strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name)); + glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB + if(glob_mi.pos < 4) + glob_mi.pos = 4; // don't hit the magic number + glob_mi.pending = 0; + flush_master_info(&glob_mi); + } + + mc_mysql_free_result(master_status_res); + } + + if(mc_mysql_query(&mysql, "UNLOCK TABLES", 0)) + { + net_printf(&thd->net, error = ER_QUERY_ON_MASTER, + mc_mysql_error(&mysql)); + goto err; + } + } +err: + pthread_mutex_unlock(&LOCK_slave); + if(slave_was_running) + start_slave(0, 0); + mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init() + if(!error) + send_ok(&thd->net); + + return error; +} + diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 68f2b4ba6c4..2428928f044 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -14,6 +14,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, int start_slave(THD* thd = 0, bool net_report = 1); int stop_slave(THD* thd = 0, bool net_report = 1); +int load_master_data(THD* thd); +int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi); int change_master(THD* thd); void reset_slave(); void reset_master(); diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 172fb0830fe..ed17a2dedf4 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -2401,6 +2401,11 @@ load: LOAD DATA_SYM opt_low_priority opt_local INFILE TEXT_STRING YYABORT; } + | + LOAD DATA_SYM FROM MASTER_SYM + { + Lex->sql_command = SQLCOM_LOAD_MASTER_DATA; + } opt_local: /* empty */ { $$=0;} |