diff options
Diffstat (limited to 'sql/wsrep_binlog.cc')
-rw-r--r-- | sql/wsrep_binlog.cc | 118 |
1 files changed, 108 insertions, 10 deletions
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc index 5c5ebb9f780..998f4e72157 100644 --- a/sql/wsrep_binlog.cc +++ b/sql/wsrep_binlog.cc @@ -15,7 +15,11 @@ #include "wsrep_binlog.h" #include "wsrep_priv.h" +#include "log.h" +#include "log_event.h" +#include "wsrep_applier.h" +extern handlerton *binlog_hton; /* Write the contents of a cache to a memory buffer. @@ -67,8 +71,13 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len) memcpy(*buf + *buf_len, cache->read_pos, length); *buf_len = total_length; - cache->read_pos = cache->read_end; - } while ((cache->file >= 0) && (length = my_b_fill(cache))); + + if (cache->file < 0) + { + cache->read_pos= cache->read_end; + break; + } + } while ((length = my_b_fill(cache))); if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) { @@ -196,8 +205,12 @@ static int wsrep_write_cache_once(wsrep_t* const wsrep, memcpy(buf + used, cache->read_pos, length); used = total_length; - cache->read_pos = cache->read_end; - } while ((cache->file >= 0) && (length = my_b_fill(cache))); + if (cache->file < 0) + { + cache->read_pos= cache->read_end; + break; + } + } while ((length = my_b_fill(cache))); if (used > 0) err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used); @@ -210,7 +223,10 @@ cleanup: WSREP_ERROR("failed to reinitialize io-cache"); } - if (unlikely(WSREP_OK != err)) wsrep_dump_rbr_buf(thd, buf, used); + if (unlikely(WSREP_OK != err)) + { + wsrep_dump_rbr_buf_with_header(thd, buf, used); + } my_free(heap_buf); return err; @@ -263,8 +279,12 @@ static int wsrep_write_cache_inc(wsrep_t* const wsrep, cache->read_pos, length))) goto cleanup; - cache->read_pos = cache->read_end; - } while ((cache->file >= 0) && (length = my_b_fill(cache))); + if (cache->file < 0) + { + cache->read_pos= cache->read_end; + break; + } + } while ((length = my_b_fill(cache))); if (WSREP_OK == err) *len = total_length; @@ -324,7 +344,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len) filename, errno, strerror(errno)); } } -extern handlerton *binlog_hton; /* wsrep exploits binlog's caches even if binlogging itself is not @@ -355,6 +374,7 @@ int wsrep_binlog_savepoint_rollback(THD *thd, void *sv) return rcode; } +#if 0 void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache) { char filename[PATH_MAX]= {0}; @@ -394,8 +414,13 @@ void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache) WSREP_ERROR("Failed to write file '%s'", filename); goto cleanup; } - cache->read_pos= cache->read_end; - } while ((cache->file >= 0) && (bytes_in_cache= my_b_fill(cache))); + + if (cache->file < 0) + { + cache->read_pos= cache->read_end; + break; + } + } while ((bytes_in_cache= my_b_fill(cache))); if(cache->error == -1) { WSREP_ERROR("RBR inconsistent"); @@ -410,3 +435,76 @@ cleanup: // close file if (of) fclose(of); } +#endif + +void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) +{ + thd->binlog_flush_pending_rows_event(stmt_end); +} + +/* Dump replication buffer along with header to a file. */ +void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, + size_t buf_len) +{ + DBUG_ENTER("wsrep_dump_rbr_buf_with_header"); + + char filename[PATH_MAX]= {0}; + File file; + IO_CACHE cache; + Log_event_writer writer(&cache); + Format_description_log_event *ev=NULL; + + int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log", + wsrep_data_home_dir, thd->thread_id, + (long long) wsrep_thd_trx_seqno(thd)); + + if (len >= PATH_MAX) + { + WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); + DBUG_VOID_RETURN; + } + + if ((file= mysql_file_open(key_file_wsrep_gra_log, filename, + O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0) + { + WSREP_ERROR("Failed to open file '%s' : %d (%s)", + filename, errno, strerror(errno)); + goto cleanup1; + } + + if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP))) + { + mysql_file_close(file, MYF(MY_WME)); + goto cleanup2; + } + + if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE)) + { + goto cleanup2; + } + + /* + Instantiate an FDLE object for non-wsrep threads (to be written + to the dump file). + */ + ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) : + (new Format_description_log_event(4)); + + if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || + flush_io_cache(&cache)) + { + WSREP_ERROR("Failed to write to '%s'.", filename); + goto cleanup2; + } + +cleanup2: + end_io_cache(&cache); + +cleanup1: + mysql_file_close(file, MYF(MY_WME)); + + if (!thd->wsrep_applier) delete ev; + + DBUG_VOID_RETURN; +} + |