diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 76 | ||||
-rw-r--r-- | sql/log_event.h | 12 | ||||
-rw-r--r-- | sql/mysqlbinlog.cc | 6 | ||||
-rw-r--r-- | sql/slave.cc | 43 | ||||
-rw-r--r-- | sql/sql_repl.cc | 27 |
5 files changed, 103 insertions, 61 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 1d45a82648a..0f314827bd5 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -71,23 +71,39 @@ int Log_event::write_header(FILE* file) #ifndef MYSQL_CLIENT -int Log_event::read_log_event(FILE* file, String* packet) +int Log_event::read_log_event(FILE* file, String* packet, + pthread_mutex_t* log_lock) { ulong data_len; char buf[LOG_EVENT_HEADER_LEN]; + if(log_lock) + pthread_mutex_lock(log_lock); if (my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP))) - return feof(file) ? LOG_READ_EOF: LOG_READ_IO; - + { + if(log_lock) pthread_mutex_unlock(log_lock); + return feof(file) ? LOG_READ_EOF: LOG_READ_IO; + } data_len = uint4korr(buf + EVENT_LEN_OFFSET); if (data_len < LOG_EVENT_HEADER_LEN || data_len > MAX_EVENT_LEN) - return LOG_READ_BOGUS; - + { + if(log_lock) pthread_mutex_unlock(log_lock); + return LOG_READ_BOGUS; + } packet->append(buf, sizeof(buf)); data_len -= LOG_EVENT_HEADER_LEN; if (!data_len) - return 0; // the event does not have a data section + { + if(log_lock) pthread_mutex_unlock(log_lock); + return 0; // the event does not have a data section + } if (packet->append(file, data_len, MYF(MY_WME|MY_NABP))) - return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO; + { + if(log_lock) + pthread_mutex_unlock(log_lock); + return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO; + } + + if(log_lock) pthread_mutex_unlock(log_lock); return 0; } @@ -95,14 +111,18 @@ int Log_event::read_log_event(FILE* file, String* packet) // allocates memory - the caller is responsible for clean-up -Log_event* Log_event::read_log_event(FILE* file) +Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) { time_t timestamp; uint32 server_id; char buf[LOG_EVENT_HEADER_LEN-4]; + if(log_lock) pthread_mutex_lock(log_lock); if (my_fread(file, (byte *) buf, sizeof(buf), MY_NABP)) - return NULL; + { + if(log_lock) pthread_mutex_unlock(log_lock); + return NULL; + } timestamp = uint4korr(buf); server_id = uint4korr(buf + 5); @@ -111,6 +131,8 @@ Log_event* Log_event::read_log_event(FILE* file) case QUERY_EVENT: { Query_log_event* q = new Query_log_event(file, timestamp, server_id); + if(log_lock) pthread_mutex_unlock(log_lock); + if (!q->query) { delete q; @@ -123,6 +145,8 @@ Log_event* Log_event::read_log_event(FILE* file) case LOAD_EVENT: { Load_log_event* l = new Load_log_event(file, timestamp, server_id); + if(log_lock) pthread_mutex_unlock(log_lock); + if (!l->table_name) { delete l; @@ -136,6 +160,8 @@ Log_event* Log_event::read_log_event(FILE* file) case ROTATE_EVENT: { Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id); + if(log_lock) pthread_mutex_unlock(log_lock); + if (!r->new_log_ident) { delete r; @@ -148,6 +174,8 @@ Log_event* Log_event::read_log_event(FILE* file) case INTVAR_EVENT: { Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id); + if(log_lock) pthread_mutex_unlock(log_lock); + if (e->type == INVALID_INT_EVENT) { delete e; @@ -157,12 +185,25 @@ Log_event* Log_event::read_log_event(FILE* file) return e; } - case START_EVENT: return new Start_log_event(file, timestamp, server_id); - case STOP_EVENT: return new Stop_log_event(file, timestamp, server_id); - default: return NULL; + case START_EVENT: + { + Start_log_event* e = new Start_log_event(file, timestamp, server_id); + if(log_lock) pthread_mutex_unlock(log_lock); + return e; + } + case STOP_EVENT: + { + Stop_log_event* e = new Stop_log_event(file, timestamp, server_id); + if(log_lock) pthread_mutex_unlock(log_lock); + return e; + } + default: + if(log_lock) pthread_mutex_unlock(log_lock); + return NULL; } //impossible + if(log_lock) pthread_mutex_unlock(log_lock); return NULL; } @@ -356,6 +397,7 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg, data_len -= QUERY_EVENT_OVERHEAD; exec_time = uint4korr(buf + 8); db_len = (uint)buf[12]; + error_code = uint2korr(buf + 13); if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME)))) return; @@ -384,11 +426,12 @@ Query_log_event::Query_log_event(const char* buf, int max_buf): data_len -= QUERY_EVENT_OVERHEAD; exec_time = uint4korr(buf + 8); + error_code = uint2korr(buf + 13); if (!(data_buf = (char*) my_malloc( data_len + 1, MYF(MY_WME)))) return; - memcpy(data_buf, buf + 13, data_len); + memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len); thread_id = uint4korr(buf + 4); db = data_buf; db_len = (uint)buf[12]; @@ -402,8 +445,8 @@ void Query_log_event::print(FILE* file, bool short_form) if (!short_form) { print_header(file); - fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\n", - (ulong) thread_id, (ulong) exec_time); + fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", + (ulong) thread_id, (ulong) exec_time, error_code); } if(db && db[0]) @@ -423,7 +466,8 @@ int Query_log_event::write_data(FILE* file) int4store(pos, exec_time); pos += 4; *pos++ = (char)db_len; - + int2store(pos, error_code); + pos += 2; if (my_fwrite(file, (byte*) buf, (uint)(pos - buf), MYF(MY_NABP | MY_WME)) || my_fwrite(file, (db) ? (byte*) db : (byte*)"", diff --git a/sql/log_event.h b/sql/log_event.h index 871258885cf..f0aecf47e8c 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -31,7 +31,8 @@ #define BINLOG_VERSION 1 #define LOG_EVENT_HEADER_LEN 13 -#define QUERY_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + sizeof(uchar)) +#define QUERY_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \ + sizeof(uchar) + sizeof(uint16)) #define LOAD_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \ + sizeof(uint32) + 2 + sizeof(uint32)) #define EVENT_LEN_OFFSET 9 @@ -88,11 +89,13 @@ public: void print_timestamp(FILE* file, time_t *ts = 0); void print_header(FILE* file); - static Log_event* read_log_event(FILE* file); + // if mutex is 0, the read will proceed without mutex + static Log_event* read_log_event(FILE* file, pthread_mutex_t* log_lock); static Log_event* read_log_event(const char* buf, int max_buf); #ifndef MYSQL_CLIENT - static int read_log_event(FILE* file, String* packet); + static int read_log_event(FILE* file, String* packet, + pthread_mutex_t* log_lock); #endif }; @@ -109,12 +112,14 @@ public: // we pass it here, so we would not have to call strlen() // otherwise, set it to 0, in which case, we compute it with strlen() uint32 db_len; + uint16 error_code; int thread_id; #if !defined(MYSQL_CLIENT) THD* thd; Query_log_event(THD* thd_arg, const char* query_arg): Log_event(thd_arg->start_time,0,0,thd_arg->server_id), data_buf(0), query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length), + error_code(thd_arg->net.last_errno), thread_id(thd_arg->thread_id), thd(thd_arg) { time_t end_time; @@ -142,6 +147,7 @@ public: return q_len + db_len + 2 + sizeof(uint32) // thread_id + sizeof(uint32) // exec_time + + sizeof(uint16) // error_code ; } diff --git a/sql/mysqlbinlog.cc b/sql/mysqlbinlog.cc index 2d1bde21f3f..bb115e250b5 100644 --- a/sql/mysqlbinlog.cc +++ b/sql/mysqlbinlog.cc @@ -284,7 +284,9 @@ static void dump_remote_log_entries(const char* logname) break; // end of data DBUG_PRINT("info",( "len= %u, net->read_pos[5] = %d\n", len, net->read_pos[5])); - Log_event * ev = Log_event::read_log_event((const char*) net->read_pos + 1 , len); + Log_event * ev = Log_event::read_log_event( + (const char*) net->read_pos + 1 , + len); if(ev) { ev->print(stdout, short_form); @@ -315,7 +317,7 @@ static void dump_local_log_entries(const char* logname) while(1) { - Log_event* ev = Log_event::read_log_event(file); + Log_event* ev = Log_event::read_log_event(file, 0); if(!ev) if(!feof(file)) die("Could not read entry at offset %ld : Error in log format or \ diff --git a/sql/slave.cc b/sql/slave.cc index f60121bf070..e9c35894fd9 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -599,36 +599,23 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) thd->query_id = query_id++; VOID(pthread_mutex_unlock(&LOCK_thread_count)); thd->last_nx_table = thd->last_nx_db = 0; - for(;;) - { - thd->query_error = 0; // clear error - thd->last_nx_table = thd->last_nx_db = 0; - thd->net.last_errno = 0; - thd->net.last_error[0] = 0; - mysql_parse(thd, thd->query, q_len); // try query - if(!thd->query_error || slave_killed(thd)) // break if ok - break; - // if not ok - if(thd->last_nx_table && thd->last_nx_db) + thd->query_error = 0; // clear error + thd->net.last_errno = 0; + thd->net.last_error[0] = 0; + mysql_parse(thd, thd->query, q_len); + int expected_error,actual_error; + if((expected_error = qev->error_code) != + (actual_error = thd->net.last_errno) && expected_error) { - // for now, let's just fail if the table is not - // there, and not try to be a smart alec... - - // if table was not there - //if(fetch_nx_table(thd,&glob_mi)) - // try to to fetch from master - break; // if we can't, just break + sql_print_error("Slave: did not get the expected error\ + running query from master - expected: '%s', got '%s'", + ER(expected_error), + actual_error ? ER(actual_error):"no error" + ); + thd->query_error = 1; } - else - break; // if failed for some other reason, bail out - - // if fetched the table from master successfully - // we need to restore query info in thd because - // fetch_nx_table executes create table - thd->query = (char*)qev->query; - thd->set_time((time_t)qev->when); - thd->current_tablenr = 0; - } + else if(expected_error == actual_error) + thd->query_error = 0; } thd->db = 0;// prevent db from being freed thd->query = 0; // just to be sure diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 1ee484b1bc1..011638afc93 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -121,7 +121,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) errmsg = "Could not find first log"; goto err; } - log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME)); + log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME)); if(!log) { @@ -143,14 +143,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) while(!net->error && net->vio != 0 && !thd->killed) { - while(!(error = Log_event::read_log_event(log, packet))) + pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock(); + + while(!(error = Log_event::read_log_event(log, packet, log_lock))) { if(my_net_write(net, (char*)packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; goto err; } - DBUG_PRINT("info", ("log event code %d",(*packet)[LOG_EVENT_OFFSET+1] )); + DBUG_PRINT("info", ("log event code %d", + (*packet)[LOG_EVENT_OFFSET+1] )); if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) { if(send_file(thd)) @@ -168,7 +171,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) goto err; } - if(!(flags & BINLOG_DUMP_NON_BLOCK) && mysql_bin_log.is_active(log_file_name)) + if(!(flags & BINLOG_DUMP_NON_BLOCK) && + mysql_bin_log.is_active(log_file_name)) // block until there is more data in the log // unless non-blocking mode requested { @@ -183,7 +187,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) // if we did not miss anything, we just wait for other threads // to signal us { - pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock(); clearerr(log); // tell the kill thread how to wake us up @@ -196,18 +199,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) bool read_packet = 0, fatal_error = 0; - pthread_mutex_lock(log_lock); // no one will update the log while we are reading + // no one will update the log while we are reading // now, but we'll be quick and just read one record - - - switch(Log_event::read_log_event(log, packet)) + switch(Log_event::read_log_event(log, packet, log_lock)) { case 0: - read_packet = 1; // we read successfully, so we'll need to send it to the + read_packet = 1; + // we read successfully, so we'll need to send it to the // slave break; case LOG_READ_EOF: + pthread_mutex_lock(log_lock); pthread_cond_wait(&COND_binlog_update, log_lock); + pthread_mutex_unlock(log_lock); break; default: @@ -215,7 +219,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) break; } - pthread_mutex_unlock(log_lock); pthread_mutex_lock(&thd->mysys_var->mutex); thd->mysys_var->current_mutex= 0; @@ -275,7 +278,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) break; (void) my_fclose(log, MYF(MY_WME)); - log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME)); + log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME)); if(!log) goto err; // fake Rotate_log event just in case it did not make it to the log |