summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc428
1 files changed, 213 insertions, 215 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 7fcfb803ac1..f8746c78e49 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -516,7 +516,6 @@ command");
static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
{
uint len = packet_error;
- NET* net = &mysql->net;
int read_errno = EINTR; // for convinience lets think we start by
// being in the interrupted state :-)
// my_real_read() will time us out
@@ -543,251 +542,250 @@ static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
}
DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
- len, net->read_pos[4]));
+ len, mysql->net.read_pos[4]));
return len - 1;
}
+
static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
- Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1 , event_len);
- if(ev)
- {
- switch(ev->get_type_code())
+ Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
+ event_len);
+ if (ev)
+ {
+ switch(ev->get_type_code())
+ {
+ case QUERY_EVENT:
+ {
+ Query_log_event* qev = (Query_log_event*)ev;
+ int q_len = qev->q_len;
+ init_sql_alloc(&thd->mem_root, 8192,0);
+ thd->db = (char*)qev->db;
+ if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
+ {
+ thd->query = (char*)qev->query;
+ thd->set_time((time_t)qev->when);
+ thd->current_tablenr = 0;
+ VOID(pthread_mutex_lock(&LOCK_thread_count));
+ thd->query_id = query_id++;
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+ thd->last_nx_table = thd->last_nx_db = 0;
+ for(;;)
{
- case QUERY_EVENT:
+ thd->query_error = 0; // clear error
+ thd->last_nx_table = thd->last_nx_db = 0;
+ thd->net.last_errno = 0;
+ thd->net.last_error[0] = 0;
+ mysql_parse(thd, thd->query, q_len); // try query
+ if(!thd->query_error || slave_killed(thd)) // break if ok
+ break;
+ // if not ok
+ if(thd->last_nx_table && thd->last_nx_db)
{
- Query_log_event* qev = (Query_log_event*)ev;
- int q_len = qev->q_len;
- init_sql_alloc(&thd->mem_root, 8192,0);
- thd->db = (char*)qev->db;
- if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
- {
- thd->query = (char*)qev->query;
- thd->set_time((time_t)qev->when);
- thd->current_tablenr = 0;
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = query_id++;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- thd->last_nx_table = thd->last_nx_db = 0;
- for(;;)
- {
- thd->query_error = 0; // clear error
- thd->last_nx_table = thd->last_nx_db = 0;
- thd->net.last_errno = 0;
- thd->net.last_error[0] = 0;
- mysql_parse(thd, thd->query, q_len); // try query
- if(!thd->query_error || slave_killed(thd)) // break if ok
- break;
- // if not ok
- if(thd->last_nx_table && thd->last_nx_db)
- {
- // for now, let's just fail if the table is not
- // there, and not try to be a smart alec...
+ // for now, let's just fail if the table is not
+ // there, and not try to be a smart alec...
- // if table was not there
- //if(fetch_nx_table(thd,&glob_mi))
- // try to to fetch from master
- break; // if we can't, just break
- }
- else
- break; // if failed for some other reason, bail out
-
- // if fetched the table from master successfully
- // we need to restore query info in thd because
- // fetch_nx_table executes create table
- thd->query = (char*)qev->query;
- thd->set_time((time_t)qev->when);
- thd->current_tablenr = 0;
- }
- }
- thd->db = 0;// prevent db from being freed
- thd->query = 0; // just to be sure
- close_thread_tables(thd);
- free_root(&thd->mem_root,0);
- if (thd->query_error)
- {
- sql_print_error("Slave: error running query '%s' ",
- qev->query);
- return 1;
- }
- delete ev;
-
- if(thd->fatal_error)
- {
- sql_print_error("Slave: Fatal error running query '%s' ",
- thd->query);
- return 1;
- }
-
- mi->inc_pos(event_len);
- flush_master_info(mi);
- break;
+ // if table was not there
+ //if(fetch_nx_table(thd,&glob_mi))
+ // try to to fetch from master
+ break; // if we can't, just break
}
+ else
+ break; // if failed for some other reason, bail out
+
+ // if fetched the table from master successfully
+ // we need to restore query info in thd because
+ // fetch_nx_table executes create table
+ thd->query = (char*)qev->query;
+ thd->set_time((time_t)qev->when);
+ thd->current_tablenr = 0;
+ }
+ }
+ thd->db = 0;// prevent db from being freed
+ thd->query = 0; // just to be sure
+ close_thread_tables(thd);
+ free_root(&thd->mem_root,0);
+ if (thd->query_error)
+ {
+ sql_print_error("Slave: error running query '%s' ",
+ qev->query);
+ return 1;
+ }
+ delete ev;
+
+ if(thd->fatal_error)
+ {
+ sql_print_error("Slave: Fatal error running query '%s' ",
+ thd->query);
+ return 1;
+ }
+
+ mi->inc_pos(event_len);
+ flush_master_info(mi);
+ break;
+ }
- case LOAD_EVENT:
- {
- Load_log_event* lev = (Load_log_event*)ev;
- init_sql_alloc(&thd->mem_root, 8192,0);
- thd->db = (char*)lev->db;
- thd->query = 0;
- thd->query_error = 0;
+ case LOAD_EVENT:
+ {
+ Load_log_event* lev = (Load_log_event*)ev;
+ init_sql_alloc(&thd->mem_root, 8192,0);
+ thd->db = (char*)lev->db;
+ thd->query = 0;
+ thd->query_error = 0;
- if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
- {
- thd->set_time((time_t)lev->when);
- thd->current_tablenr = 0;
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = query_id++;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
-
- enum enum_duplicates handle_dup = DUP_IGNORE;
- if(lev->sql_ex.opt_flags && REPLACE_FLAG)
- handle_dup = DUP_REPLACE;
- sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG );
- String field_term(&lev->sql_ex.field_term, 1),
- enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1),
- escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1);
+ if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
+ {
+ thd->set_time((time_t)lev->when);
+ thd->current_tablenr = 0;
+ VOID(pthread_mutex_lock(&LOCK_thread_count));
+ thd->query_id = query_id++;
+ VOID(pthread_mutex_unlock(&LOCK_thread_count));
+
+ enum enum_duplicates handle_dup = DUP_IGNORE;
+ if(lev->sql_ex.opt_flags && REPLACE_FLAG)
+ handle_dup = DUP_REPLACE;
+ sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && DUMPFILE_FLAG );
+ String field_term(&lev->sql_ex.field_term, 1),
+ enclosed(&lev->sql_ex.enclosed, 1), line_term(&lev->sql_ex.line_term,1),
+ escaped(&lev->sql_ex.escaped, 1), line_start(&lev->sql_ex.line_start, 1);
- ex.field_term = &field_term;
- if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
- ex.field_term->length(0);
+ ex.field_term = &field_term;
+ if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
+ ex.field_term->length(0);
- ex.enclosed = &enclosed;
- if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
- ex.enclosed->length(0);
+ ex.enclosed = &enclosed;
+ if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
+ ex.enclosed->length(0);
- ex.line_term = &line_term;
- if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
- ex.line_term->length(0);
+ ex.line_term = &line_term;
+ if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
+ ex.line_term->length(0);
- ex.line_start = &line_start;
- if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
- ex.line_start->length(0);
+ ex.line_start = &line_start;
+ if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
+ ex.line_start->length(0);
- ex.escaped = &escaped;
- if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
- ex.escaped->length(0);
+ ex.escaped = &escaped;
+ if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
+ ex.escaped->length(0);
- ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
- if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
- ex.field_term->length(0);
+ ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
+ if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
+ ex.field_term->length(0);
- ex.skip_lines = lev->skip_lines;
+ ex.skip_lines = lev->skip_lines;
- TABLE_LIST tables;
- bzero((char*) &tables,sizeof(tables));
- tables.db = thd->db;
- tables.name = tables.real_name = (char*)lev->table_name;
- tables.lock_type = TL_WRITE;
+ TABLE_LIST tables;
+ bzero((char*) &tables,sizeof(tables));
+ tables.db = thd->db;
+ tables.name = tables.real_name = (char*)lev->table_name;
+ tables.lock_type = TL_WRITE;
- if (open_tables(thd, &tables))
- {
- sql_print_error("Slave: error opening table %s ",
- tables.name);
- delete ev;
- return 1;
- }
-
- List<Item> fields;
- lev->set_fields(fields);
- thd->net.vio = net->vio;
- // mysql_load will use thd->net to read the file
- thd->net.pkt_nr = net->pkt_nr;
- // make sure the client does get confused
- // about the packet sequence
- if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
- TL_WRITE))
- thd->query_error = 1;
- net->pkt_nr = thd->net.pkt_nr;
- }
- else // we will just ask the master to send us /dev/null if we do not want to
- // load the data :-)
- {
- (void)my_net_write(net, "\xfb/dev/null", 10);
- (void)net_flush(net);
- (void)my_net_read(net); // discard response
- send_ok(net); // the master expects it
- }
+ if (open_tables(thd, &tables))
+ {
+ sql_print_error("Slave: error opening table %s ",
+ tables.name);
+ delete ev;
+ return 1;
+ }
+
+ List<Item> fields;
+ lev->set_fields(fields);
+ thd->net.vio = net->vio;
+ // mysql_load will use thd->net to read the file
+ thd->net.pkt_nr = net->pkt_nr;
+ // make sure the client does get confused
+ // about the packet sequence
+ if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
+ TL_WRITE))
+ thd->query_error = 1;
+ net->pkt_nr = thd->net.pkt_nr;
+ }
+ else // we will just ask the master to send us /dev/null if we do not want to
+ // load the data :-)
+ {
+ (void)my_net_write(net, "\xfb/dev/null", 10);
+ (void)net_flush(net);
+ (void)my_net_read(net); // discard response
+ send_ok(net); // the master expects it
+ }
- thd->net.vio = 0;
- thd->db = 0;// prevent db from being freed
- close_thread_tables(thd);
- if(thd->query_error)
- {
- int sql_error = thd->net.last_errno;
- if(!sql_error)
- sql_error = ER_UNKNOWN_ERROR;
+ thd->net.vio = 0;
+ thd->db = 0;// prevent db from being freed
+ close_thread_tables(thd);
+ if(thd->query_error)
+ {
+ int sql_error = thd->net.last_errno;
+ if(!sql_error)
+ sql_error = ER_UNKNOWN_ERROR;
- sql_print_error("Slave: error '%s' running load data infile ",
- ER(sql_error));
- delete ev;
- return 1;
- }
- delete ev;
+ sql_print_error("Slave: error '%s' running load data infile ",
+ ER(sql_error));
+ delete ev;
+ return 1;
+ }
+ delete ev;
- if(thd->fatal_error)
- {
- sql_print_error("Slave: Fatal error running query '%s' ",
- thd->query);
- return 1;
- }
-
- mi->inc_pos(event_len);
- flush_master_info(mi);
- break;
- }
+ if(thd->fatal_error)
+ {
+ sql_print_error("Slave: Fatal error running query '%s' ",
+ thd->query);
+ return 1;
+ }
+
+ mi->inc_pos(event_len);
+ flush_master_info(mi);
+ break;
+ }
- case START_EVENT:
- mi->inc_pos(event_len);
- flush_master_info(mi);
- break;
+ case START_EVENT:
+ mi->inc_pos(event_len);
+ flush_master_info(mi);
+ break;
- case STOP_EVENT:
- mi->inc_pos(event_len);
- flush_master_info(mi);
- break;
- case ROTATE_EVENT:
- {
- Rotate_log_event* rev = (Rotate_log_event*)ev;
- int ident_len = rev->ident_len;
- memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
- mi->log_file_name[ident_len] = 0;
- mi->pos = 0;
- break;
- }
+ case STOP_EVENT:
+ mi->inc_pos(event_len);
+ flush_master_info(mi);
+ break;
+ case ROTATE_EVENT:
+ {
+ Rotate_log_event* rev = (Rotate_log_event*)ev;
+ int ident_len = rev->ident_len;
+ memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
+ mi->log_file_name[ident_len] = 0;
+ mi->pos = 0;
+ break;
+ }
- case INTVAR_EVENT:
- {
- Intvar_log_event* iev = (Intvar_log_event*)ev;
- switch(iev->type)
- {
- case LAST_INSERT_ID_EVENT:
- thd->last_insert_id_used = 1;
- thd->last_insert_id = iev->val;
- break;
- case INSERT_ID_EVENT:
- thd->next_insert_id = iev->val;
- break;
+ case INTVAR_EVENT:
+ {
+ Intvar_log_event* iev = (Intvar_log_event*)ev;
+ switch(iev->type)
+ {
+ case LAST_INSERT_ID_EVENT:
+ thd->last_insert_id_used = 1;
+ thd->last_insert_id = iev->val;
+ break;
+ case INSERT_ID_EVENT:
+ thd->next_insert_id = iev->val;
+ break;
- }
- mi->inc_pos(event_len);
- flush_master_info(mi);
- break;
- }
- }
-
+ }
+ mi->inc_pos(event_len);
+ flush_master_info(mi);
+ break;
+ }
}
+
+ }
else
- {
- sql_print_error("Could not parse log event entry, check the master for binlog corruption\
+ {
+ sql_print_error("Could not parse log event entry, check the master for binlog corruption\n\
This may also be a network problem, or just a bug in the master or slave code");
- return 1;
- }
-
-
-
-return 0;
+ return 1;
+ }
+ return 0;
}
// slave thread