summaryrefslogtreecommitdiff
path: root/sql/wsrep_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_binlog.cc')
-rw-r--r--sql/wsrep_binlog.cc414
1 files changed, 414 insertions, 0 deletions
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
new file mode 100644
index 00000000000..ee8a9cb130b
--- /dev/null
+++ b/sql/wsrep_binlog.cc
@@ -0,0 +1,414 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#include "wsrep_binlog.h"
+#include "wsrep_priv.h"
+#include "log.h"
+
+extern handlerton *binlog_hton;
+/*
+ Write the contents of a cache to a memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
+{
+ *buf= NULL;
+ *buf_len= 0;
+
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return ER_ERROR_ON_WRITE;
+ }
+
+ uint length = my_b_bytes_in_cache(cache);
+ if (unlikely(0 == length)) length = my_b_fill(cache);
+
+ size_t total_length = 0;
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /*
+ Bail out if buffer grows too large.
+ A temporary fix to avoid allocating indefinitely large buffer,
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (total_length > wsrep_max_ws_size)
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ goto error;
+ }
+ uchar* tmp = (uchar *)my_realloc(*buf, total_length,
+ MYF(MY_ALLOW_ZERO_PTR));
+ if (!tmp)
+ {
+ WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
+ *buf_len, length);
+ goto error;
+ }
+ *buf = tmp;
+
+ memcpy(*buf + *buf_len, cache->read_pos, length);
+ *buf_len = total_length;
+ cache->read_pos = cache->read_end;
+ } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ goto cleanup;
+ }
+
+ return 0;
+
+error:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ }
+cleanup:
+ my_free(*buf);
+ *buf= NULL;
+ *buf_len= 0;
+ return ER_ERROR_ON_WRITE;
+}
+
+#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
+ * many transactions would fit in there
+ * so there is no need to reach for the heap */
+
+/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */
+static inline size_t
+heap_size(size_t length)
+{
+ return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE;
+}
+
+/* append data to writeset */
+static inline wsrep_status_t
+wsrep_append_data(wsrep_t* const wsrep,
+ wsrep_ws_handle_t* const ws,
+ const void* const data,
+ size_t const len)
+{
+ struct wsrep_buf const buff = { data, len };
+ wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1,
+ WSREP_DATA_ORDERED, true));
+ if (rc != WSREP_OK)
+ {
+ WSREP_WARN("append_data() returned %d", rc);
+ }
+
+ return rc;
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ This version reads all of cache into single buffer and then appends to a
+ writeset at once.
+ */
+static int wsrep_write_cache_once(wsrep_t* const wsrep,
+ THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return ER_ERROR_ON_WRITE;
+ }
+
+ int err(WSREP_OK);
+
+ size_t total_length(0);
+ uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/
+ uchar* heap_buf(NULL);
+ uchar* buf(stack_buf);
+ size_t allocated(sizeof(stack_buf));
+ size_t used(0);
+
+ uint length(my_b_bytes_in_cache(cache));
+ if (unlikely(0 == length)) length = my_b_fill(cache);
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /*
+ Bail out if buffer grows too large.
+ A temporary fix to avoid allocating indefinitely large buffer,
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (unlikely(total_length > wsrep_max_ws_size))
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ err = WSREP_TRX_SIZE_EXCEEDED;
+ goto cleanup;
+ }
+
+ if (total_length > allocated)
+ {
+ size_t const new_size(heap_size(total_length));
+ uchar* tmp = (uchar *)my_realloc(heap_buf, new_size,
+ MYF(MY_ALLOW_ZERO_PTR));
+ if (!tmp)
+ {
+ WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
+ allocated, length);
+ err = WSREP_TRX_SIZE_EXCEEDED;
+ goto cleanup;
+ }
+
+ heap_buf = tmp;
+ buf = heap_buf;
+ allocated = new_size;
+
+ if (used <= STACK_SIZE && used > 0) // there's data in stack_buf
+ {
+ DBUG_ASSERT(buf == stack_buf);
+ memcpy(heap_buf, stack_buf, used);
+ }
+ }
+
+ memcpy(buf + used, cache->read_pos, length);
+ used = total_length;
+ cache->read_pos = cache->read_end;
+ } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (used > 0)
+ err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used);
+
+ if (WSREP_OK == err) *len = total_length;
+
+cleanup:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+
+ if (unlikely(WSREP_OK != err)) wsrep_dump_rbr_buf(thd, buf, used);
+
+ my_free(heap_buf);
+ return err;
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ This version uses incremental data appending as it reads it from cache.
+ */
+static int wsrep_write_cache_inc(wsrep_t* const wsrep,
+ THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return WSREP_TRX_ERROR;
+ }
+
+ int err(WSREP_OK);
+
+ size_t total_length(0);
+
+ uint length(my_b_bytes_in_cache(cache));
+ if (unlikely(0 == length)) length = my_b_fill(cache);
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /* bail out if buffer grows too large
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (unlikely(total_length > wsrep_max_ws_size))
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ err = WSREP_TRX_SIZE_EXCEEDED;
+ goto cleanup;
+ }
+
+ if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle,
+ cache->read_pos, length)))
+ goto cleanup;
+
+ cache->read_pos = cache->read_end;
+ } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (WSREP_OK == err) *len = total_length;
+
+cleanup:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+
+ return err;
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache(wsrep_t* const wsrep,
+ THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ if (wsrep_incremental_data_collection) {
+ return wsrep_write_cache_inc(wsrep, thd, cache, len);
+ }
+ else {
+ return wsrep_write_cache_once(wsrep, thd, cache, len);
+ }
+}
+
+void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
+{
+ char filename[PATH_MAX]= {0};
+ int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
+ wsrep_data_home_dir, thd->thread_id,
+ (long long)wsrep_thd_trx_seqno(thd));
+ if (len >= PATH_MAX)
+ {
+ WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
+ return;
+ }
+
+ FILE *of= fopen(filename, "wb");
+ if (of)
+ {
+ fwrite (rbr_buf, buf_len, 1, of);
+ fclose(of);
+ }
+ else
+ {
+ WSREP_ERROR("Failed to open file '%s': %d (%s)",
+ filename, errno, strerror(errno));
+ }
+}
+
+/*
+ wsrep exploits binlog's caches even if binlogging itself is not
+ activated. In such case connection close needs calling
+ actual binlog's method.
+ Todo: split binlog hton from its caches to use ones by wsrep
+ without referring to binlog's stuff.
+*/
+int wsrep_binlog_close_connection(THD* thd)
+{
+ DBUG_ENTER("wsrep_binlog_close_connection");
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ binlog_hton->close_connection (binlog_hton, thd);
+ DBUG_RETURN(0);
+}
+
+int wsrep_binlog_savepoint_set(THD *thd, void *sv)
+{
+ if (!wsrep_emulate_bin_log) return 0;
+ int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv);
+ return rcode;
+}
+
+int wsrep_binlog_savepoint_rollback(THD *thd, void *sv)
+{
+ if (!wsrep_emulate_bin_log) return 0;
+ int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
+ return rcode;
+}
+
+void wsrep_dump_rbr_direct(THD* thd, IO_CACHE* cache)
+{
+ char filename[PATH_MAX]= {0};
+ int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
+ wsrep_data_home_dir, thd->thread_id,
+ (long long)wsrep_thd_trx_seqno(thd));
+ size_t bytes_in_cache = 0;
+ // check path
+ if (len >= PATH_MAX)
+ {
+ WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
+ return ;
+ }
+ // init cache
+ my_off_t const saved_pos(my_b_tell(cache));
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return ;
+ }
+ // open file
+ FILE* of = fopen(filename, "wb");
+ if (!of)
+ {
+ WSREP_ERROR("Failed to open file '%s': %d (%s)",
+ filename, errno, strerror(errno));
+ goto cleanup;
+ }
+ // ready to write
+ bytes_in_cache= my_b_bytes_in_cache(cache);
+ if (unlikely(bytes_in_cache == 0)) bytes_in_cache = my_b_fill(cache);
+ if (likely(bytes_in_cache > 0)) do
+ {
+ if (my_fwrite(of, cache->read_pos, bytes_in_cache,
+ MYF(MY_WME | MY_NABP)) == (size_t) -1)
+ {
+ WSREP_ERROR("Failed to write file '%s'", filename);
+ goto cleanup;
+ }
+ cache->read_pos= cache->read_end;
+ } while ((cache->file >= 0) && (bytes_in_cache= my_b_fill(cache)));
+ if(cache->error == -1)
+ {
+ WSREP_ERROR("RBR inconsistent");
+ goto cleanup;
+ }
+cleanup:
+ // init back
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+ // close file
+ if (of) fclose(of);
+}
+
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
+{
+ thd->binlog_flush_pending_rows_event(stmt_end);
+}