diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 428 |
1 files changed, 213 insertions, 215 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 7fcfb803ac1..f8746c78e49 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -516,7 +516,6 @@ command"); static uint read_event(MYSQL* mysql, MASTER_INFO *mi) { uint len = packet_error; - NET* net = &mysql->net; int read_errno = EINTR; // for convinience lets think we start by // being in the interrupted state :-) // my_real_read() will time us out @@ -543,251 +542,250 @@ static uint read_event(MYSQL* mysql, MASTER_INFO *mi) } DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n", - len, net->read_pos[4])); + len, mysql->net.read_pos[4])); return len - 1; } + static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) { - Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1 , event_len); - if(ev) - { - switch(ev->get_type_code()) + Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, + event_len); + if (ev) + { + switch(ev->get_type_code()) + { + case QUERY_EVENT: + { + Query_log_event* qev = (Query_log_event*)ev; + int q_len = qev->q_len; + init_sql_alloc(&thd->mem_root, 8192,0); + thd->db = (char*)qev->db; + if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->query = (char*)qev->query; + thd->set_time((time_t)qev->when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + thd->last_nx_table = thd->last_nx_db = 0; + for(;;) { - case QUERY_EVENT: + thd->query_error = 0; // clear error + thd->last_nx_table = thd->last_nx_db = 0; + thd->net.last_errno = 0; + thd->net.last_error[0] = 0; + mysql_parse(thd, thd->query, q_len); // try query + if(!thd->query_error || slave_killed(thd)) // break if ok + break; + // if not ok + if(thd->last_nx_table && thd->last_nx_db) { - Query_log_event* qev = (Query_log_event*)ev; - int q_len = qev->q_len; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = (char*)qev->db; - if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->query = (char*)qev->query; - thd->set_time((time_t)qev->when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - thd->last_nx_table = thd->last_nx_db = 0; - for(;;) - { - thd->query_error = 0; // clear error - thd->last_nx_table = thd->last_nx_db = 0; - thd->net.last_errno = 0; - thd->net.last_error[0] = 0; - mysql_parse(thd, thd->query, q_len); // try query - if(!thd->query_error || slave_killed(thd)) // break if ok - break; - // if not ok - if(thd->last_nx_table && thd->last_nx_db) - { - // for now, let's just fail if the table is not - // there, and not try to be a smart alec... + // for now, let's just fail if the table is not + // there, and not try to be a smart alec... - // if table was not there - //if(fetch_nx_table(thd,&glob_mi)) - // try to to fetch from master - break; // if we can't, just break - } - else - break; // if failed for some other reason, bail out - - // if fetched the table from master successfully - // we need to restore query info in thd because - // fetch_nx_table executes create table - thd->query = (char*)qev->query; - thd->set_time((time_t)qev->when); - thd->current_tablenr = 0; - } - } - thd->db = 0;// prevent db from being freed - thd->query = 0; // just to be sure - close_thread_tables(thd); - free_root(&thd->mem_root,0); - if (thd->query_error) - { - sql_print_error("Slave: error running query '%s' ", - qev->query); - return 1; - } - delete ev; - - if(thd->fatal_error) - { - sql_print_error("Slave: Fatal error running query '%s' ", - thd->query); - return 1; - } - - mi->inc_pos(event_len); - flush_master_info(mi); - break; + // if table was not there + //if(fetch_nx_table(thd,&glob_mi)) + // try to to fetch from master + break; // if we can't, just break } + else + break; // if failed for some other reason, bail out + + // if fetched the table from master successfully + // we need to restore query info in thd because + // fetch_nx_table executes create table + thd->query = (char*)qev->query; + thd->set_time((time_t)qev->when); + thd->current_tablenr = 0; + } + } + thd->db = 0;// prevent db from being freed + thd->query = 0; // just to be sure + close_thread_tables(thd); + free_root(&thd->mem_root,0); + if (thd->query_error) + { + sql_print_error("Slave: error running query '%s' ", + qev->query); + return 1; + } + delete ev; + + if(thd->fatal_error) + { + sql_print_error("Slave: Fatal error running query '%s' ", + thd->query); + return 1; + } + + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } - case LOAD_EVENT: - { - Load_log_event* lev = (Load_log_event*)ev; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = (char*)lev->db; - thd->query = 0; - thd->query_error = 0; + case LOAD_EVENT: + { + Load_log_event* lev = (Load_log_event*)ev; + init_sql_alloc(&thd->mem_root, 8192,0); + thd->db = (char*)lev->db; + thd->query = 0; + thd->query_error = 0; - if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->set_time((time_t)lev->when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - enum enum_duplicates handle_dup = DUP_IGNORE; - if(lev->sql_ex.opt_flags && REPLACE_FLAG) - handle_dup = DUP_REPLACE; - sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG ); - String field_term(&lev->sql_ex.field_term, 1), - enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1), - escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1); + if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->set_time((time_t)lev->when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + enum enum_duplicates handle_dup = DUP_IGNORE; + if(lev->sql_ex.opt_flags && REPLACE_FLAG) + handle_dup = DUP_REPLACE; + sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG ); + String field_term(&lev->sql_ex.field_term, 1), + enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1), + escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1); - ex.field_term = &field_term; - if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); + ex.field_term = &field_term; + if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); - ex.enclosed = &enclosed; - if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY) - ex.enclosed->length(0); + ex.enclosed = &enclosed; + if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY) + ex.enclosed->length(0); - ex.line_term = &line_term; - if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY) - ex.line_term->length(0); + ex.line_term = &line_term; + if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY) + ex.line_term->length(0); - ex.line_start = &line_start; - if(lev->sql_ex.empty_flags & LINE_START_EMPTY) - ex.line_start->length(0); + ex.line_start = &line_start; + if(lev->sql_ex.empty_flags & LINE_START_EMPTY) + ex.line_start->length(0); - ex.escaped = &escaped; - if(lev->sql_ex.empty_flags & ESCAPED_EMPTY) - ex.escaped->length(0); + ex.escaped = &escaped; + if(lev->sql_ex.empty_flags & ESCAPED_EMPTY) + ex.escaped->length(0); - ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG); - if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); + ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG); + if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); - ex.skip_lines = lev->skip_lines; + ex.skip_lines = lev->skip_lines; - TABLE_LIST tables; - bzero((char*) &tables,sizeof(tables)); - tables.db = thd->db; - tables.name = tables.real_name = (char*)lev->table_name; - tables.lock_type = TL_WRITE; + TABLE_LIST tables; + bzero((char*) &tables,sizeof(tables)); + tables.db = thd->db; + tables.name = tables.real_name = (char*)lev->table_name; + tables.lock_type = TL_WRITE; - if (open_tables(thd, &tables)) - { - sql_print_error("Slave: error opening table %s ", - tables.name); - delete ev; - return 1; - } - - List<Item> fields; - lev->set_fields(fields); - thd->net.vio = net->vio; - // mysql_load will use thd->net to read the file - thd->net.pkt_nr = net->pkt_nr; - // make sure the client does get confused - // about the packet sequence - if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1, - TL_WRITE)) - thd->query_error = 1; - net->pkt_nr = thd->net.pkt_nr; - } - else // we will just ask the master to send us /dev/null if we do not want to - // load the data :-) - { - (void)my_net_write(net, "\xfb/dev/null", 10); - (void)net_flush(net); - (void)my_net_read(net); // discard response - send_ok(net); // the master expects it - } + if (open_tables(thd, &tables)) + { + sql_print_error("Slave: error opening table %s ", + tables.name); + delete ev; + return 1; + } + + List<Item> fields; + lev->set_fields(fields); + thd->net.vio = net->vio; + // mysql_load will use thd->net to read the file + thd->net.pkt_nr = net->pkt_nr; + // make sure the client does get confused + // about the packet sequence + if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1, + TL_WRITE)) + thd->query_error = 1; + net->pkt_nr = thd->net.pkt_nr; + } + else // we will just ask the master to send us /dev/null if we do not want to + // load the data :-) + { + (void)my_net_write(net, "\xfb/dev/null", 10); + (void)net_flush(net); + (void)my_net_read(net); // discard response + send_ok(net); // the master expects it + } - thd->net.vio = 0; - thd->db = 0;// prevent db from being freed - close_thread_tables(thd); - if(thd->query_error) - { - int sql_error = thd->net.last_errno; - if(!sql_error) - sql_error = ER_UNKNOWN_ERROR; + thd->net.vio = 0; + thd->db = 0;// prevent db from being freed + close_thread_tables(thd); + if(thd->query_error) + { + int sql_error = thd->net.last_errno; + if(!sql_error) + sql_error = ER_UNKNOWN_ERROR; - sql_print_error("Slave: error '%s' running load data infile ", - ER(sql_error)); - delete ev; - return 1; - } - delete ev; + sql_print_error("Slave: error '%s' running load data infile ", + ER(sql_error)); + delete ev; + return 1; + } + delete ev; - if(thd->fatal_error) - { - sql_print_error("Slave: Fatal error running query '%s' ", - thd->query); - return 1; - } - - mi->inc_pos(event_len); - flush_master_info(mi); - break; - } + if(thd->fatal_error) + { + sql_print_error("Slave: Fatal error running query '%s' ", + thd->query); + return 1; + } + + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } - case START_EVENT: - mi->inc_pos(event_len); - flush_master_info(mi); - break; + case START_EVENT: + mi->inc_pos(event_len); + flush_master_info(mi); + break; - case STOP_EVENT: - mi->inc_pos(event_len); - flush_master_info(mi); - break; - case ROTATE_EVENT: - { - Rotate_log_event* rev = (Rotate_log_event*)ev; - int ident_len = rev->ident_len; - memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); - mi->log_file_name[ident_len] = 0; - mi->pos = 0; - break; - } + case STOP_EVENT: + mi->inc_pos(event_len); + flush_master_info(mi); + break; + case ROTATE_EVENT: + { + Rotate_log_event* rev = (Rotate_log_event*)ev; + int ident_len = rev->ident_len; + memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); + mi->log_file_name[ident_len] = 0; + mi->pos = 0; + break; + } - case INTVAR_EVENT: - { - Intvar_log_event* iev = (Intvar_log_event*)ev; - switch(iev->type) - { - case LAST_INSERT_ID_EVENT: - thd->last_insert_id_used = 1; - thd->last_insert_id = iev->val; - break; - case INSERT_ID_EVENT: - thd->next_insert_id = iev->val; - break; + case INTVAR_EVENT: + { + Intvar_log_event* iev = (Intvar_log_event*)ev; + switch(iev->type) + { + case LAST_INSERT_ID_EVENT: + thd->last_insert_id_used = 1; + thd->last_insert_id = iev->val; + break; + case INSERT_ID_EVENT: + thd->next_insert_id = iev->val; + break; - } - mi->inc_pos(event_len); - flush_master_info(mi); - break; - } - } - + } + mi->inc_pos(event_len); + flush_master_info(mi); + break; + } } + + } else - { - sql_print_error("Could not parse log event entry, check the master for binlog corruption\ + { + sql_print_error("Could not parse log event entry, check the master for binlog corruption\n\ This may also be a network problem, or just a bug in the master or slave code"); - return 1; - } - - - -return 0; + return 1; + } + return 0; } // slave thread |