summaryrefslogtreecommitdiff
path: root/sql/wsrep_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_binlog.cc')
-rw-r--r--sql/wsrep_binlog.cc118
1 files changed, 108 insertions, 10 deletions
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
index 5c5ebb9f780..998f4e72157 100644
--- a/sql/wsrep_binlog.cc
+++ b/sql/wsrep_binlog.cc
@@ -15,7 +15,11 @@
#include "wsrep_binlog.h"
#include "wsrep_priv.h"
+#include "log.h"
+#include "log_event.h"
+#include "wsrep_applier.h"
+extern handlerton *binlog_hton;
/*
Write the contents of a cache to a memory buffer.
@@ -67,8 +71,13 @@ int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
memcpy(*buf + *buf_len, cache->read_pos, length);
*buf_len = total_length;
- cache->read_pos = cache->read_end;
- } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (cache->file < 0)
+ {
+ cache->read_pos= cache->read_end;
+ break;
+ }
+ } while ((length = my_b_fill(cache)));
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
{
@@ -196,8 +205,12 @@ static int wsrep_write_cache_once(wsrep_t* const wsrep,
memcpy(buf + used, cache->read_pos, length);
used = total_length;
- cache->read_pos = cache->read_end;
- } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+ if (cache->file < 0)
+ {
+ cache->read_pos= cache->read_end;
+ break;
+ }
+ } while ((length = my_b_fill(cache)));
if (used > 0)
err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used);
@@ -210,7 +223,10 @@ cleanup:
WSREP_ERROR("failed to reinitialize io-cache");
}
- if (unlikely(WSREP_OK != err)) wsrep_dump_rbr_buf(thd, buf, used);
+ if (unlikely(WSREP_OK != err))
+ {
+ wsrep_dump_rbr_buf_with_header(thd, buf, used);
+ }
my_free(heap_buf);
return err;
@@ -263,8 +279,12 @@ static int wsrep_write_cache_inc(wsrep_t* const wsrep,
cache->read_pos, length)))
goto cleanup;
- cache->read_pos = cache->read_end;
- } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+ if (cache->file < 0)
+ {
+ cache->read_pos= cache->read_end;
+ break;
+ }
+ } while ((length = my_b_fill(cache)));
if (WSREP_OK == err) *len = total_length;
@@ -324,7 +344,6 @@ void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
filename, errno, strerror(errno));
}
}
-extern handlerton *binlog_hton;
/*
wsrep exploits binlog's caches even if binlogging itself is not
@@ -355,6 +374,7 @@ int wsrep_binlog_savepoint_rollback(THD *thd, void *sv)
return rcode;
}
+#if 0
void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
{
char filename[PATH_MAX]= {0};
@@ -394,8 +414,13 @@ void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
WSREP_ERROR("Failed to write file '%s'", filename);
goto cleanup;
}
- cache->read_pos= cache->read_end;
- } while ((cache->file >= 0) && (bytes_in_cache= my_b_fill(cache)));
+
+ if (cache->file < 0)
+ {
+ cache->read_pos= cache->read_end;
+ break;
+ }
+ } while ((bytes_in_cache= my_b_fill(cache)));
if(cache->error == -1)
{
WSREP_ERROR("RBR inconsistent");
@@ -410,3 +435,76 @@ cleanup:
// close file
if (of) fclose(of);
}
+#endif
+
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
+{
+ thd->binlog_flush_pending_rows_event(stmt_end);
+}
+
+/* Dump replication buffer along with header to a file. */
+void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
+ size_t buf_len)
+{
+ DBUG_ENTER("wsrep_dump_rbr_buf_with_header");
+
+ char filename[PATH_MAX]= {0};
+ File file;
+ IO_CACHE cache;
+ Log_event_writer writer(&cache);
+ Format_description_log_event *ev=NULL;
+
+ int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log",
+ wsrep_data_home_dir, thd->thread_id,
+ (long long) wsrep_thd_trx_seqno(thd));
+
+ if (len >= PATH_MAX)
+ {
+ WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
+ DBUG_VOID_RETURN;
+ }
+
+ if ((file= mysql_file_open(key_file_wsrep_gra_log, filename,
+ O_RDWR | O_CREAT | O_BINARY, MYF(MY_WME))) < 0)
+ {
+ WSREP_ERROR("Failed to open file '%s' : %d (%s)",
+ filename, errno, strerror(errno));
+ goto cleanup1;
+ }
+
+ if (init_io_cache(&cache, file, 0, WRITE_CACHE, 0, 0, MYF(MY_WME | MY_NABP)))
+ {
+ mysql_file_close(file, MYF(MY_WME));
+ goto cleanup2;
+ }
+
+ if (my_b_safe_write(&cache, BINLOG_MAGIC, BIN_LOG_HEADER_SIZE))
+ {
+ goto cleanup2;
+ }
+
+ /*
+ Instantiate an FDLE object for non-wsrep threads (to be written
+ to the dump file).
+ */
+ ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
+ (new Format_description_log_event(4));
+
+ if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
+ flush_io_cache(&cache))
+ {
+ WSREP_ERROR("Failed to write to '%s'.", filename);
+ goto cleanup2;
+ }
+
+cleanup2:
+ end_io_cache(&cache);
+
+cleanup1:
+ mysql_file_close(file, MYF(MY_WME));
+
+ if (!thd->wsrep_applier) delete ev;
+
+ DBUG_VOID_RETURN;
+}
+