summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster_binlog.cc
diff options
context:
space:
mode:
authorSergei Golubchik <serg@mariadb.org>2014-08-21 18:11:46 +0200
committerSergei Golubchik <serg@mariadb.org>2014-10-11 18:53:06 +0200
commit7f5e51b940d65cf541403a50af74163b9aed5cb8 (patch)
treee540d3cd4d678cd276a9d496490ac5e527f30a78 /sql/ha_ndbcluster_binlog.cc
parent57dd1f6f3fcbc7a46e1b3e71257987315f7aa687 (diff)
downloadmariadb-git-7f5e51b940d65cf541403a50af74163b9aed5cb8.tar.gz
MDEV-34 delete storage/ndb and sql/*ndb* (and collateral changes)
remove: * NDB from everywhere * IM from mtr-v1 * packaging/rpm-oel and packaging/rpm-uln * few unused spec files * plug.in file * .bzrignore
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r--sql/ha_ndbcluster_binlog.cc4425
1 files changed, 0 insertions, 4425 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
deleted file mode 100644
index 531211eb175..00000000000
--- a/sql/ha_ndbcluster_binlog.cc
+++ /dev/null
@@ -1,4425 +0,0 @@
-/* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
- Copyright (c) 2012, 2013, Monty Proram Ab.
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-*/
-
-#include "sql_priv.h"
-#include "unireg.h" // REQUIRED: for other includes
-#include "sql_show.h"
-#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
-#include "ha_ndbcluster.h"
-
-#ifdef HAVE_NDB_BINLOG
-#include "rpl_injector.h"
-#include "rpl_filter.h"
-#include "slave.h"
-#include "ha_ndbcluster_binlog.h"
-#include "NdbDictionary.hpp"
-#include "ndb_cluster_connection.hpp"
-#include <util/NdbAutoPtr.hpp>
-
-#include "sql_base.h" // close_thread_tables
-#include "sql_table.h" // build_table_filename
-#include "table.h" // open_table_from_share
-#include "discover.h" // readfrm, writefrm
-#include "lock.h" // MYSQL_LOCK_IGNORE_FLUSH,
- // mysql_unlock_tables
-#include "sql_parse.h" // mysql_parse
-#include "transaction.h"
-
-#ifdef ndb_dynamite
-#undef assert
-#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
-#endif
-
-extern my_bool opt_ndb_log_binlog_index;
-extern ulong opt_ndb_extra_logging;
-/*
- defines for cluster replication table names
-*/
-#include "ha_ndbcluster_tables.h"
-#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
-#define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
-
-/*
- Timeout for syncing schema events between
- mysql servers, and between mysql server and the binlog
-*/
-static const int DEFAULT_SYNC_TIMEOUT= 120;
-
-
-/*
- Flag showing if the ndb injector thread is running, if so == 1
- -1 if it was started but later stopped for some reason
- 0 if never started
-*/
-static int ndb_binlog_thread_running= 0;
-
-/*
- Flag showing if the ndb binlog should be created, if so == TRUE
- FALSE if not
-*/
-my_bool ndb_binlog_running= FALSE;
-my_bool ndb_binlog_tables_inited= FALSE;
-
-/*
- Global reference to the ndb injector thread THD oject
-
- Has one sole purpose, for setting the in_use table member variable
- in get_share(...)
-*/
-THD *injector_thd= 0;
-
-/*
- Global reference to ndb injector thd object.
-
- Used mainly by the binlog index thread, but exposed to the client sql
- thread for one reason; to setup the events operations for a table
- to enable ndb injector thread receiving events.
-
- Must therefore always be used with a surrounding
- mysql_mutex_lock(&injector_mutex), when doing create/dropEventOperation
-*/
-static Ndb *injector_ndb= 0;
-static Ndb *schema_ndb= 0;
-
-static int ndbcluster_binlog_inited= 0;
-/*
- Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
- Server main loop should call handlerton function:
-
- ndbcluster_hton->binlog_func ==
- ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
- ndbcluster_binlog_end
-
- at shutdown, which sets the flag. And then server needs to wait for it
- to complete. Otherwise binlog will not be complete.
-
- ndbcluster_hton->panic == ndbcluster_end() will not return until
- ndb binlog is completed
-*/
-static int ndbcluster_binlog_terminating= 0;
-
-/*
- Mutex and condition used for interacting between client sql thread
- and injector thread
-*/
-pthread_t ndb_binlog_thread;
-mysql_mutex_t injector_mutex;
-mysql_cond_t injector_cond;
-
-/* NDB Injector thread (used for binlog creation) */
-static ulonglong ndb_latest_applied_binlog_epoch= 0;
-static ulonglong ndb_latest_handled_binlog_epoch= 0;
-static ulonglong ndb_latest_received_binlog_epoch= 0;
-
-NDB_SHARE *ndb_apply_status_share= 0;
-NDB_SHARE *ndb_schema_share= 0;
-mysql_mutex_t ndb_schema_share_mutex;
-
-extern my_bool opt_log_slave_updates;
-static my_bool g_ndb_log_slave_updates;
-
-/* Schema object distribution handling */
-HASH ndb_schema_objects;
-typedef struct st_ndb_schema_object {
- mysql_mutex_t mutex;
- char *key;
- uint key_length;
- uint use_count;
- MY_BITMAP slock_bitmap;
- uint32 slock[256/32]; // 256 bits for lock status of table
-} NDB_SCHEMA_OBJECT;
-static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
- my_bool create_if_not_exists,
- my_bool have_lock);
-static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
- bool have_lock);
-
-static Uint64 *p_latest_trans_gci= 0;
-
-/*
- Global variables for holding the ndb_binlog_index table reference
-*/
-static TABLE *ndb_binlog_index= 0;
-static TABLE_LIST binlog_tables;
-
-/*
- Helper functions
-*/
-
-#ifndef DBUG_OFF
-/* purecov: begin deadcode */
-static void print_records(TABLE *table, const uchar *record)
-{
- for (uint j= 0; j < table->s->fields; j++)
- {
- char buf[40];
- int pos= 0;
- Field *field= table->field[j];
- const uchar* field_ptr= field->ptr - table->record[0] + record;
- int pack_len= field->pack_length();
- int n= pack_len < 10 ? pack_len : 10;
-
- for (int i= 0; i < n && pos < 20; i++)
- {
- pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
- }
- buf[pos]= 0;
- DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
- }
-}
-/* purecov: end */
-#else
-#define print_records(a,b)
-#endif
-
-
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table)
-{
- if (table == 0)
- {
- DBUG_PRINT("info",("%s: (null)", info));
- return;
- }
- DBUG_PRINT("info",
- ("%s: %s.%s s->fields: %d "
- "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
- "record[1]: 0x%lx",
- info,
- table->s->db.str,
- table->s->table_name.str,
- table->s->fields,
- table->s->reclength,
- table->s->rec_buff_length,
- (long) table->record[0],
- (long) table->record[1]));
-
- for (unsigned int i= 0; i < table->s->fields; i++)
- {
- Field *f= table->field[i];
- DBUG_PRINT("info",
- ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
- "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
- i,
- f->field_name,
- (long) f->flags,
- (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
- (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
- (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
- (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
- (f->flags & BLOB_FLAG) ? ",blob" : "",
- (f->flags & BINARY_FLAG) ? ",binary" : "",
- f->real_type(),
- f->pack_length(),
- (long) f->ptr, (int) (f->ptr - table->record[0]),
- f->null_bit,
- (long) f->null_ptr,
- (int) ((uchar*) f->null_ptr - table->record[0])));
- if (f->type() == MYSQL_TYPE_BIT)
- {
- Field_bit *g= (Field_bit*) f;
- DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
- "bit_ofs: %d bit_len: %u",
- g->field_length, (long) g->bit_ptr,
- (int) ((uchar*) g->bit_ptr -
- table->record[0]),
- g->bit_ofs, g->bit_len));
- }
- }
-}
-#else
-#define dbug_print_table(a,b)
-#endif
-
-
-/*
- Run a query through mysql_parse
-
- Used to:
- - purging the ndb_binlog_index
- - creating the ndb_apply_status table
-*/
-static void run_query(THD *thd, char *buf, char *end,
- const int *no_print_error, my_bool disable_binlog)
-{
- ulong save_thd_query_length= thd->query_length();
- char *save_thd_query= thd->query();
- ulong save_thread_id= thd->variables.pseudo_thread_id;
- struct system_status_var save_thd_status_var= thd->status_var;
- THD_TRANS save_thd_transaction_all= thd->transaction.all;
- THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
- ulonglong save_thd_options= thd->variables.option_bits;
- DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->variables.option_bits));
- NET save_thd_net= thd->net;
-
- bzero((char*) &thd->net, sizeof(NET));
- thd->set_query(buf, (uint) (end - buf));
- thd->variables.pseudo_thread_id= thread_id;
- thd->transaction.stmt.modified_non_trans_table= FALSE;
- if (disable_binlog)
- thd->variables.option_bits&= ~OPTION_BIN_LOG;
-
- DBUG_PRINT("query", ("%s", thd->query()));
-
- DBUG_ASSERT(!thd->in_sub_stmt);
- DBUG_ASSERT(!thd->locked_tables_mode);
-
- {
- Parser_state parser_state;
- if (!parser_state.init(thd, thd->query(), thd->query_length()))
- mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
- }
-
- if (no_print_error && thd->is_slave_error)
- {
- int i;
- Thd_ndb *thd_ndb= get_thd_ndb(thd);
- for (i= 0; no_print_error[i]; i++)
- if ((thd_ndb->m_error_code == no_print_error[i]) ||
- (thd->get_stmt_da()->sql_errno() == (unsigned) no_print_error[i]))
- break;
- if (!no_print_error[i])
- sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
- buf,
- thd->get_stmt_da()->message(),
- thd->get_stmt_da()->sql_errno(),
- thd_ndb->m_error_code,
- (int) thd->is_error(), thd->is_slave_error);
- }
- /*
- XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
- can not be called from within a statement, and
- run_query() can be called from anywhere, including from within
- a sub-statement.
- This particular reset is a temporary hack to avoid an assert
- for double assignment of the diagnostics area when run_query()
- is called from ndbcluster_reset_logs(), which is called from
- mysql_flush().
- */
- thd->get_stmt_da()->reset_diagnostics_area();
-
- thd->variables.option_bits= save_thd_options;
- thd->set_query(save_thd_query, save_thd_query_length);
- thd->variables.pseudo_thread_id= save_thread_id;
- thd->status_var= save_thd_status_var;
- thd->transaction.all= save_thd_transaction_all;
- thd->transaction.stmt= save_thd_transaction_stmt;
- thd->net= save_thd_net;
- thd->set_current_stmt_binlog_format_row();
-
- if (thd == injector_thd)
- {
- /*
- running the query will close all tables, including the ndb_binlog_index
- used in injector_thd
- */
- ndb_binlog_index= 0;
- }
-}
-
-static void
-ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
-{
- DBUG_ENTER("ndbcluster_binlog_close_table");
- if (share->table_share)
- {
- closefrm(share->table, 1);
- share->table_share= 0;
- share->table= 0;
- }
- DBUG_ASSERT(share->table == 0);
- DBUG_VOID_RETURN;
-}
-
-
-/*
- Creates a TABLE object for the ndb cluster table
-
- NOTES
- This does not open the underlying table
-*/
-
-static int
-ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
- TABLE_SHARE *table_share, TABLE *table,
- int reopen)
-{
- int error;
- DBUG_ENTER("ndbcluster_binlog_open_table");
-
- init_tmp_table_share(thd, table_share, share->db, 0, share->table_name,
- share->key);
- if ((error= open_table_def(thd, table_share, 0)))
- {
- DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error, my_errno));
- free_table_share(table_share);
- DBUG_RETURN(error);
- }
- if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
- (uint) READ_ALL, 0, table, FALSE)))
- {
- DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error, my_errno));
- free_table_share(table_share);
- DBUG_RETURN(error);
- }
- tdc_assign_new_table_id(table_share);
-
- if (!reopen)
- {
- // allocate memory on ndb share so it can be reused after online alter table
- (void)multi_alloc_root(&share->mem_root,
- &(share->record[0]), table->s->rec_buff_length,
- &(share->record[1]), table->s->rec_buff_length,
- NULL);
- }
- {
- my_ptrdiff_t row_offset= share->record[0] - table->record[0];
- Field **p_field;
- for (p_field= table->field; *p_field; p_field++)
- (*p_field)->move_field_offset(row_offset);
- table->record[0]= share->record[0];
- table->record[1]= share->record[1];
- }
-
- table->in_use= injector_thd;
-
- table->s->db.str= share->db;
- table->s->db.length= strlen(share->db);
- table->s->table_name.str= share->table_name;
- table->s->table_name.length= strlen(share->table_name);
-
- DBUG_ASSERT(share->table_share == 0);
- share->table_share= table_share;
- DBUG_ASSERT(share->table == 0);
- share->table= table;
- /* We can't use 'use_all_columns()' as the file object is not setup yet */
- table->column_bitmaps_set_no_signal(&table->s->all_set, &table->s->all_set);
-#ifndef DBUG_OFF
- dbug_print_table("table", table);
-#endif
- DBUG_RETURN(0);
-}
-
-
-/*
- Initialize the binlog part of the NDB_SHARE
-*/
-int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
-{
- THD *thd= current_thd;
- MEM_ROOT *mem_root= &share->mem_root;
- int do_event_op= ndb_binlog_running;
- int error= 0;
- DBUG_ENTER("ndbcluster_binlog_init_share");
-
- share->connect_count= g_ndb_cluster_connection->get_connect_count();
-
- share->op= 0;
- share->table= 0;
-
- if (!ndb_schema_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
- do_event_op= 1;
- else if (!ndb_apply_status_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
- do_event_op= 1;
-
- {
- int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
- share->subscriber_bitmap= (MY_BITMAP*)
- alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
- for (i= 0; i < no_nodes; i++)
- {
- my_bitmap_init(&share->subscriber_bitmap[i],
- (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
- max_ndb_nodes, FALSE);
- bitmap_clear_all(&share->subscriber_bitmap[i]);
- }
- }
-
- if (!do_event_op)
- {
- if (_table)
- {
- if (_table->s->primary_key == MAX_KEY)
- share->flags|= NSF_HIDDEN_PK;
- if (_table->s->blob_fields != 0)
- share->flags|= NSF_BLOB_FLAG;
- }
- else
- {
- share->flags|= NSF_NO_BINLOG;
- }
- DBUG_RETURN(error);
- }
- while (1)
- {
- int error;
- TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
- TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
- if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
- break;
- /*
- ! do not touch the contents of the table
- it may be in use by the injector thread
- */
- MEM_ROOT *mem_root= &share->mem_root;
- share->ndb_value[0]= (NdbValue*)
- alloc_root(mem_root, sizeof(NdbValue) *
- (table->s->fields + 2 /*extra for hidden key and part key*/));
- share->ndb_value[1]= (NdbValue*)
- alloc_root(mem_root, sizeof(NdbValue) *
- (table->s->fields + 2 /*extra for hidden key and part key*/));
-
- if (table->s->primary_key == MAX_KEY)
- share->flags|= NSF_HIDDEN_PK;
- if (table->s->blob_fields != 0)
- share->flags|= NSF_BLOB_FLAG;
- break;
- }
- DBUG_RETURN(error);
-}
-
-/*****************************************************************
- functions called from master sql client threads
-****************************************************************/
-
-/*
- called in mysql_show_binlog_events and reset_logs to make sure we wait for
- all events originating from this mysql server to arrive in the binlog
-
- Wait for the last epoch in which the last transaction is a part of.
-
- Wait a maximum of 30 seconds.
-*/
-static void ndbcluster_binlog_wait(THD *thd)
-{
- if (ndb_binlog_running)
- {
- DBUG_ENTER("ndbcluster_binlog_wait");
- const char *save_info= thd ? thd->proc_info : 0;
- ulonglong wait_epoch= *p_latest_trans_gci;
- int count= 30;
- if (thd)
- thd->proc_info= "Waiting for ndbcluster binlog update to "
- "reach current position";
- while (count && ndb_binlog_running &&
- ndb_latest_handled_binlog_epoch < wait_epoch)
- {
- count--;
- sleep(1);
- }
- if (thd)
- thd->proc_info= save_info;
- DBUG_VOID_RETURN;
- }
-}
-
-/*
- Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
-*/
-static int ndbcluster_reset_logs(THD *thd)
-{
- if (!ndb_binlog_running)
- return 0;
-
- DBUG_ENTER("ndbcluster_reset_logs");
-
- /*
- Wait for all events orifinating from this mysql server has
- reached the binlog before continuing to reset
- */
- ndbcluster_binlog_wait(thd);
-
- char buf[1024];
- char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
-
- run_query(thd, buf, end, NULL, TRUE);
-
- DBUG_RETURN(0);
-}
-
-/*
- Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
- is removed
-*/
-
-static int
-ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
-{
- if (!ndb_binlog_running || thd->slave_thread)
- return 0;
-
- DBUG_ENTER("ndbcluster_binlog_index_purge_file");
- DBUG_PRINT("enter", ("file: %s", file));
-
- char buf[1024];
- char *end= strmov(strmov(strmov(buf,
- "DELETE FROM "
- NDB_REP_DB "." NDB_REP_TABLE
- " WHERE File='"), file), "'");
-
- run_query(thd, buf, end, NULL, TRUE);
-
- DBUG_RETURN(0);
-}
-
-static void
-ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
- const char *query, uint query_length,
- const char *db, const char *table_name)
-{
- DBUG_ENTER("ndbcluster_binlog_log_query");
- DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
- db, table_name, query));
- enum SCHEMA_OP_TYPE type;
- int log= 0;
- switch (binlog_command)
- {
- case LOGCOM_CREATE_TABLE:
- type= SOT_CREATE_TABLE;
- DBUG_ASSERT(FALSE);
- break;
- case LOGCOM_ALTER_TABLE:
- type= SOT_ALTER_TABLE;
- log= 1;
- break;
- case LOGCOM_RENAME_TABLE:
- type= SOT_RENAME_TABLE;
- DBUG_ASSERT(FALSE);
- break;
- case LOGCOM_DROP_TABLE:
- type= SOT_DROP_TABLE;
- DBUG_ASSERT(FALSE);
- break;
- case LOGCOM_CREATE_DB:
- type= SOT_CREATE_DB;
- log= 1;
- break;
- case LOGCOM_ALTER_DB:
- type= SOT_ALTER_DB;
- log= 1;
- break;
- case LOGCOM_DROP_DB:
- type= SOT_DROP_DB;
- DBUG_ASSERT(FALSE);
- break;
- }
- if (log)
- {
- ndbcluster_log_schema_op(thd, 0, query, query_length,
- db, table_name, 0, 0, type,
- 0, 0);
- }
- DBUG_VOID_RETURN;
-}
-
-
-/*
- End use of the NDB Cluster binlog
- - wait for binlog thread to shutdown
-*/
-
-static int ndbcluster_binlog_end(THD *thd)
-{
- DBUG_ENTER("ndbcluster_binlog_end");
-
- if (!ndbcluster_binlog_inited)
- DBUG_RETURN(0);
- ndbcluster_binlog_inited= 0;
-
-#ifdef HAVE_NDB_BINLOG
- if (ndb_util_thread_running > 0)
- {
- /*
- Wait for util thread to die (as this uses the injector mutex)
- There is a very small change that ndb_util_thread dies and the
- following mutex is freed before it's accessed. This shouldn't
- however be a likely case as the ndbcluster_binlog_end is supposed to
- be called before ndb_cluster_end().
- */
- mysql_mutex_lock(&LOCK_ndb_util_thread);
- /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
- ndb_util_thread_running++;
- ndbcluster_terminating= 1;
- mysql_cond_signal(&COND_ndb_util_thread);
- while (ndb_util_thread_running > 1)
- mysql_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
- ndb_util_thread_running--;
- mysql_mutex_unlock(&LOCK_ndb_util_thread);
- }
-
- /* wait for injector thread to finish */
- ndbcluster_binlog_terminating= 1;
- mysql_mutex_lock(&injector_mutex);
- mysql_cond_signal(&injector_cond);
- while (ndb_binlog_thread_running > 0)
- mysql_cond_wait(&injector_cond, &injector_mutex);
- mysql_mutex_unlock(&injector_mutex);
-
- mysql_mutex_destroy(&injector_mutex);
- mysql_cond_destroy(&injector_cond);
- mysql_mutex_destroy(&ndb_schema_share_mutex);
-#endif
-
- DBUG_RETURN(0);
-}
-
-/*****************************************************************
- functions called from slave sql client threads
-****************************************************************/
-static void ndbcluster_reset_slave(THD *thd)
-{
- if (!ndb_binlog_running)
- return;
-
- DBUG_ENTER("ndbcluster_reset_slave");
- char buf[1024];
- char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
- run_query(thd, buf, end, NULL, TRUE);
- DBUG_VOID_RETURN;
-}
-
-/*
- Initialize the binlog part of the ndb handlerton
-*/
-
-/**
- Upon the sql command flush logs, we need to ensure that all outstanding
- ndb data to be logged has made it to the binary log to get a deterministic
- behavior on the rotation of the log.
- */
-static bool ndbcluster_flush_logs(handlerton *hton)
-{
- ndbcluster_binlog_wait(current_thd);
- return FALSE;
-}
-
-static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
- enum_binlog_func fn,
- void *arg)
-{
- switch(fn)
- {
- case BFN_RESET_LOGS:
- ndbcluster_reset_logs(thd);
- break;
- case BFN_RESET_SLAVE:
- ndbcluster_reset_slave(thd);
- break;
- case BFN_BINLOG_WAIT:
- ndbcluster_binlog_wait(thd);
- break;
- case BFN_BINLOG_END:
- ndbcluster_binlog_end(thd);
- break;
- case BFN_BINLOG_PURGE_FILE:
- ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
- break;
- }
- return 0;
-}
-
-void ndbcluster_binlog_init_handlerton()
-{
- handlerton *h= ndbcluster_hton;
- h->flush_logs= ndbcluster_flush_logs;
- h->binlog_func= ndbcluster_binlog_func;
- h->binlog_log_query= ndbcluster_binlog_log_query;
-}
-
-
-
-
-
-/*
- check the availability af the ndb_apply_status share
- - return share, but do not increase refcount
- - return 0 if there is no share
-*/
-static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
-{
- mysql_mutex_lock(&ndbcluster_mutex);
-
- void *share= my_hash_search(&ndbcluster_open_tables,
- (uchar*) NDB_APPLY_TABLE_FILE,
- sizeof(NDB_APPLY_TABLE_FILE) - 1);
- DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
- NDB_APPLY_TABLE_FILE, (long) share));
- mysql_mutex_unlock(&ndbcluster_mutex);
- return (NDB_SHARE*) share;
-}
-
-/*
- check the availability af the schema share
- - return share, but do not increase refcount
- - return 0 if there is no share
-*/
-static NDB_SHARE *ndbcluster_check_ndb_schema_share()
-{
- mysql_mutex_lock(&ndbcluster_mutex);
-
- void *share= my_hash_search(&ndbcluster_open_tables,
- (uchar*) NDB_SCHEMA_TABLE_FILE,
- sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
- DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
- NDB_SCHEMA_TABLE_FILE, (long) share));
- mysql_mutex_unlock(&ndbcluster_mutex);
- return (NDB_SHARE*) share;
-}
-
-/*
- Create the ndb_apply_status table
-*/
-static int ndbcluster_create_ndb_apply_status_table(THD *thd)
-{
- DBUG_ENTER("ndbcluster_create_ndb_apply_status_table");
-
- /*
- Check if we already have the apply status table.
- If so it should have been discovered at startup
- and thus have a share
- */
-
- if (ndbcluster_check_ndb_apply_status_share())
- DBUG_RETURN(0);
-
- if (g_ndb_cluster_connection->get_no_ready() <= 0)
- DBUG_RETURN(0);
-
- char buf[1024 + 1], *end;
-
- if (opt_ndb_extra_logging)
- sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
-
- /*
- Check if apply status table exists in MySQL "dictionary"
- if so, remove it since there is none in Ndb
- */
- {
- build_table_filename(buf, sizeof(buf) - 1,
- NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0);
- mysql_file_delete(key_file_frm, buf, MYF(0));
- }
-
- /*
- Note, updating this table schema must be reflected in ndb_restore
- */
- end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
- NDB_REP_DB "." NDB_APPLY_TABLE
- " ( server_id INT UNSIGNED NOT NULL,"
- " epoch BIGINT UNSIGNED NOT NULL, "
- " log_name VARCHAR(255) BINARY NOT NULL, "
- " start_pos BIGINT UNSIGNED NOT NULL, "
- " end_pos BIGINT UNSIGNED NOT NULL, "
- " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1");
-
- const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
- 701,
- 702,
- 721, // Table already exist
- 4009,
- 0}; // do not print error 701 etc
- run_query(thd, buf, end, no_print_error, TRUE);
-
- DBUG_RETURN(0);
-}
-
-
-/*
- Create the schema table
-*/
-static int ndbcluster_create_schema_table(THD *thd)
-{
- DBUG_ENTER("ndbcluster_create_schema_table");
-
- /*
- Check if we already have the schema table.
- If so it should have been discovered at startup
- and thus have a share
- */
-
- if (ndbcluster_check_ndb_schema_share())
- DBUG_RETURN(0);
-
- if (g_ndb_cluster_connection->get_no_ready() <= 0)
- DBUG_RETURN(0);
-
- char buf[1024 + 1], *end;
-
- if (opt_ndb_extra_logging)
- sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
-
- /*
- Check if schema table exists in MySQL "dictionary"
- if so, remove it since there is none in Ndb
- */
- {
- build_table_filename(buf, sizeof(buf) - 1,
- NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0);
- mysql_file_delete(key_file_frm, buf, MYF(0));
- }
-
- /*
- Update the defines below to reflect the table schema
- */
- end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
- NDB_REP_DB "." NDB_SCHEMA_TABLE
- " ( db VARBINARY(63) NOT NULL,"
- " name VARBINARY(63) NOT NULL,"
- " slock BINARY(32) NOT NULL,"
- " query BLOB NOT NULL,"
- " node_id INT UNSIGNED NOT NULL,"
- " epoch BIGINT UNSIGNED NOT NULL,"
- " id INT UNSIGNED NOT NULL,"
- " version INT UNSIGNED NOT NULL,"
- " type INT UNSIGNED NOT NULL,"
- " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1");
-
- const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
- 701,
- 702,
- 721, // Table already exist
- 4009,
- 0}; // do not print error 701 etc
- run_query(thd, buf, end, no_print_error, TRUE);
-
- DBUG_RETURN(0);
-}
-
-int ndbcluster_setup_binlog_table_shares(THD *thd)
-{
- if (!ndb_schema_share &&
- ndbcluster_check_ndb_schema_share() == 0)
- {
- ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
- if (!ndb_schema_share)
- {
- ndbcluster_create_schema_table(thd);
- // always make sure we create the 'schema' first
- if (!ndb_schema_share)
- return 1;
- }
- }
- if (!ndb_apply_status_share &&
- ndbcluster_check_ndb_apply_status_share() == 0)
- {
- ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
- if (!ndb_apply_status_share)
- {
- ndbcluster_create_ndb_apply_status_table(thd);
- if (!ndb_apply_status_share)
- return 1;
- }
- }
- if (!ndbcluster_find_all_files(thd))
- {
- ndb_binlog_tables_inited= TRUE;
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: ndb tables writable");
- close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
- /* Signal injector thread that all is setup */
- mysql_cond_signal(&injector_cond);
- }
- return 0;
-}
-
-/*
- Defines and struct for schema table.
- Should reflect table definition above.
-*/
-#define SCHEMA_DB_I 0u
-#define SCHEMA_NAME_I 1u
-#define SCHEMA_SLOCK_I 2u
-#define SCHEMA_QUERY_I 3u
-#define SCHEMA_NODE_ID_I 4u
-#define SCHEMA_EPOCH_I 5u
-#define SCHEMA_ID_I 6u
-#define SCHEMA_VERSION_I 7u
-#define SCHEMA_TYPE_I 8u
-#define SCHEMA_SIZE 9u
-#define SCHEMA_SLOCK_SIZE 32u
-
-struct Cluster_schema
-{
- uchar db_length;
- char db[64];
- uchar name_length;
- char name[64];
- uchar slock_length;
- uint32 slock[SCHEMA_SLOCK_SIZE/4];
- unsigned short query_length;
- char *query;
- Uint64 epoch;
- uint32 node_id;
- uint32 id;
- uint32 version;
- uint32 type;
- uint32 any_value;
-};
-
-static void print_could_not_discover_error(THD *thd,
- const Cluster_schema *schema)
-{
- sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
- "binlog schema event '%s' from node %d. "
- "my_errno: %d",
- schema->db, schema->name, schema->query,
- schema->node_id, my_errno);
- List_iterator_fast<Sql_condition> it(thd->warning_info->warn_list());
- Sql_condition *err;
- while ((err= it++))
- sql_print_warning("NDB Binlog: (%d)%s", err->get_sql_errno(),
- err->get_message_text());
-}
-
-/*
- Transfer schema table data into corresponding struct
-*/
-static void ndbcluster_get_schema(NDB_SHARE *share,
- Cluster_schema *s)
-{
- TABLE *table= share->table;
- Field **field;
- /* unpack blob values */
- uchar* blobs_buffer= 0;
- uint blobs_buffer_size= 0;
- my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
- {
- ptrdiff_t ptrdiff= 0;
- int ret= get_ndb_blobs_value(table, share->ndb_value[0],
- blobs_buffer, blobs_buffer_size,
- ptrdiff);
- if (ret != 0)
- {
- my_free(blobs_buffer);
- DBUG_PRINT("info", ("blob read error"));
- DBUG_ASSERT(FALSE);
- }
- }
- /* db varchar 1 length uchar */
- field= table->field;
- s->db_length= *(uint8*)(*field)->ptr;
- DBUG_ASSERT(s->db_length <= (*field)->field_length);
- DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
- memcpy(s->db, (*field)->ptr + 1, s->db_length);
- s->db[s->db_length]= 0;
- /* name varchar 1 length uchar */
- field++;
- s->name_length= *(uint8*)(*field)->ptr;
- DBUG_ASSERT(s->name_length <= (*field)->field_length);
- DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
- memcpy(s->name, (*field)->ptr + 1, s->name_length);
- s->name[s->name_length]= 0;
- /* slock fixed length */
- field++;
- s->slock_length= (*field)->field_length;
- DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
- memcpy(s->slock, (*field)->ptr, s->slock_length);
- /* query blob */
- field++;
- {
- Field_blob *field_blob= (Field_blob*)(*field);
- uint blob_len= field_blob->get_length((*field)->ptr);
- uchar *blob_ptr= 0;
- field_blob->get_ptr(&blob_ptr);
- DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
- s->query_length= blob_len;
- s->query= sql_strmake((char*) blob_ptr, blob_len);
- }
- /* node_id */
- field++;
- s->node_id= ((Field_long *)*field)->val_int();
- /* epoch */
- field++;
- s->epoch= ((Field_long *)*field)->val_int();
- /* id */
- field++;
- s->id= ((Field_long *)*field)->val_int();
- /* version */
- field++;
- s->version= ((Field_long *)*field)->val_int();
- /* type */
- field++;
- s->type= ((Field_long *)*field)->val_int();
- /* free blobs buffer */
- my_free(blobs_buffer);
- dbug_tmp_restore_column_map(table->read_set, old_map);
-}
-
-/*
- helper function to pack a ndb varchar
-*/
-char *ndb_pack_varchar(const NDBCOL *col, char *buf,
- const char *str, int sz)
-{
- switch (col->getArrayType())
- {
- case NDBCOL::ArrayTypeFixed:
- memcpy(buf, str, sz);
- break;
- case NDBCOL::ArrayTypeShortVar:
- *(uchar*)buf= (uchar)sz;
- memcpy(buf + 1, str, sz);
- break;
- case NDBCOL::ArrayTypeMediumVar:
- int2store(buf, sz);
- memcpy(buf + 2, str, sz);
- break;
- }
- return buf;
-}
-
-/*
- acknowledge handling of schema operation
-*/
-static int
-ndbcluster_update_slock(THD *thd,
- const char *db,
- const char *table_name)
-{
- DBUG_ENTER("ndbcluster_update_slock");
- if (!ndb_schema_share)
- {
- DBUG_RETURN(0);
- }
-
- const NdbError *ndb_error= 0;
- uint32 node_id= g_ndb_cluster_connection->node_id();
- Ndb *ndb= check_ndb_in_thd(thd);
- char save_db[FN_HEADLEN];
- strcpy(save_db, ndb->getDatabaseName());
-
- char tmp_buf[FN_REFLEN];
- NDBDICT *dict= ndb->getDictionary();
- ndb->setDatabaseName(NDB_REP_DB);
- Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
- const NDBTAB *ndbtab= ndbtab_g.get_table();
- NdbTransaction *trans= 0;
- int retries= 100;
- int retry_sleep= 10; /* 10 milliseconds, transaction */
- const NDBCOL *col[SCHEMA_SIZE];
- unsigned sz[SCHEMA_SIZE];
-
- MY_BITMAP slock;
- uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
- my_bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
-
- if (ndbtab == 0)
- {
- abort();
- DBUG_RETURN(0);
- }
-
- {
- uint i;
- for (i= 0; i < SCHEMA_SIZE; i++)
- {
- col[i]= ndbtab->getColumn(i);
- if (i != SCHEMA_QUERY_I)
- {
- sz[i]= col[i]->getLength();
- DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
- }
- }
- }
-
- while (1)
- {
- if ((trans= ndb->startTransaction()) == 0)
- goto err;
- {
- NdbOperation *op= 0;
- int r= 0;
-
- /* read the bitmap exlusive */
- r|= (op= trans->getNdbOperation(ndbtab)) == 0;
- DBUG_ASSERT(r == 0);
- r|= op->readTupleExclusive();
- DBUG_ASSERT(r == 0);
-
- /* db */
- ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
- r|= op->equal(SCHEMA_DB_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* name */
- ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
- strlen(table_name));
- r|= op->equal(SCHEMA_NAME_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* slock */
- r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
- DBUG_ASSERT(r == 0);
- }
- if (trans->execute(NdbTransaction::NoCommit))
- goto err;
- bitmap_clear_bit(&slock, node_id);
- {
- NdbOperation *op= 0;
- int r= 0;
-
- /* now update the tuple */
- r|= (op= trans->getNdbOperation(ndbtab)) == 0;
- DBUG_ASSERT(r == 0);
- r|= op->updateTuple();
- DBUG_ASSERT(r == 0);
-
- /* db */
- ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
- r|= op->equal(SCHEMA_DB_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* name */
- ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
- strlen(table_name));
- r|= op->equal(SCHEMA_NAME_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* slock */
- r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
- DBUG_ASSERT(r == 0);
- /* node_id */
- r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
- DBUG_ASSERT(r == 0);
- /* type */
- r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
- DBUG_ASSERT(r == 0);
- }
- if (trans->execute(NdbTransaction::Commit) == 0)
- {
- dict->forceGCPWait();
- DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
- node_id, db, table_name));
- break;
- }
- err:
- const NdbError *this_error= trans ?
- &trans->getNdbError() : &ndb->getNdbError();
- if (this_error->status == NdbError::TemporaryError)
- {
- if (retries--)
- {
- if (trans)
- ndb->closeTransaction(trans);
- my_sleep(retry_sleep);
- continue; // retry
- }
- }
- ndb_error= this_error;
- break;
- }
-
- if (ndb_error)
- {
- char buf[1024];
- my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
- db, table_name);
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- ndb_error->code, ndb_error->message, buf);
- }
- if (trans)
- ndb->closeTransaction(trans);
- ndb->setDatabaseName(save_db);
- DBUG_RETURN(0);
-}
-
-/*
- log query in schema table
-*/
-static void ndb_report_waiting(const char *key,
- int the_time,
- const char *op,
- const char *obj)
-{
- ulonglong ndb_latest_epoch= 0;
- const char *proc_info= "<no info>";
- mysql_mutex_lock(&injector_mutex);
- if (injector_ndb)
- ndb_latest_epoch= injector_ndb->getLatestGCI();
- if (injector_thd)
- proc_info= injector_thd->proc_info;
- mysql_mutex_unlock(&injector_mutex);
- sql_print_information("NDB %s:"
- " waiting max %u sec for %s %s."
- " epochs: (%u,%u,%u)"
- " injector proc_info: %s"
- ,key, the_time, op, obj
- ,(uint)ndb_latest_handled_binlog_epoch
- ,(uint)ndb_latest_received_binlog_epoch
- ,(uint)ndb_latest_epoch
- ,proc_info
- );
-}
-
-int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
- const char *query, int query_length,
- const char *db, const char *table_name,
- uint32 ndb_table_id,
- uint32 ndb_table_version,
- enum SCHEMA_OP_TYPE type,
- const char *new_db, const char *new_table_name)
-{
- DBUG_ENTER("ndbcluster_log_schema_op");
- Thd_ndb *thd_ndb= get_thd_ndb(thd);
- if (!thd_ndb)
- {
- if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
- {
- sql_print_error("Could not allocate Thd_ndb object");
- DBUG_RETURN(1);
- }
- set_thd_ndb(thd, thd_ndb);
- }
-
- DBUG_PRINT("enter",
- ("query: %s db: %s table_name: %s thd_ndb->options: %d",
- query, db, table_name, thd_ndb->options));
- if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
- {
- DBUG_RETURN(0);
- }
-
- char tmp_buf2_mem[FN_REFLEN];
- String tmp_buf2(tmp_buf2_mem, sizeof(tmp_buf2_mem), system_charset_info);
- tmp_buf2.length(0);
- const char *type_str;
- switch (type)
- {
- case SOT_DROP_TABLE:
- /* drop database command, do not log at drop table */
- if (thd->lex->sql_command == SQLCOM_DROP_DB)
- DBUG_RETURN(0);
- /* redo the drop table query as is may contain several tables */
- tmp_buf2.append(STRING_WITH_LEN("drop table "));
- append_identifier(thd, &tmp_buf2, table_name, strlen(table_name));
- query= tmp_buf2.c_ptr_safe();
- query_length= tmp_buf2.length();
- type_str= "drop table";
- break;
- case SOT_RENAME_TABLE:
- /* redo the rename table query as is may contain several tables */
- tmp_buf2.append(STRING_WITH_LEN("rename table "));
- append_identifier(thd, &tmp_buf2, db, strlen(db));
- tmp_buf2.append(STRING_WITH_LEN("."));
- append_identifier(thd, &tmp_buf2, table_name, strlen(table_name));
- tmp_buf2.append(STRING_WITH_LEN(" to "));
- append_identifier(thd, &tmp_buf2, new_db, strlen(new_db));
- tmp_buf2.append(STRING_WITH_LEN("."));
- append_identifier(thd, &tmp_buf2, new_table_name, strlen(new_table_name));
- query= tmp_buf2.c_ptr_safe();
- query_length= tmp_buf2.length();
- type_str= "rename table";
- break;
- case SOT_CREATE_TABLE:
- type_str= "create table";
- break;
- case SOT_ALTER_TABLE:
- type_str= "alter table";
- break;
- case SOT_DROP_DB:
- type_str= "drop db";
- break;
- case SOT_CREATE_DB:
- type_str= "create db";
- break;
- case SOT_ALTER_DB:
- type_str= "alter db";
- break;
- case SOT_TABLESPACE:
- type_str= "tablespace";
- break;
- case SOT_LOGFILE_GROUP:
- type_str= "logfile group";
- break;
- case SOT_TRUNCATE_TABLE:
- type_str= "truncate table";
- break;
- default:
- abort(); /* should not happen, programming error */
- }
-
- NDB_SCHEMA_OBJECT *ndb_schema_object;
- {
- char key[FN_REFLEN + 1];
- build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
- ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
- }
-
- const NdbError *ndb_error= 0;
- uint32 node_id= g_ndb_cluster_connection->node_id();
- Uint64 epoch= 0;
- MY_BITMAP schema_subscribers;
- uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
- char bitbuf_e[sizeof(bitbuf)];
- bzero(bitbuf_e, sizeof(bitbuf_e));
- {
- int i, updated= 0;
- int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
- my_bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE);
- bitmap_set_all(&schema_subscribers);
-
- /* begin protect ndb_schema_share */
- mysql_mutex_lock(&ndb_schema_share_mutex);
- if (ndb_schema_share == 0)
- {
- mysql_mutex_unlock(&ndb_schema_share_mutex);
- if (ndb_schema_object)
- ndb_free_schema_object(&ndb_schema_object, FALSE);
- DBUG_RETURN(0);
- }
- mysql_mutex_lock(&ndb_schema_share->mutex);
- for (i= 0; i < no_storage_nodes; i++)
- {
- MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
- if (!bitmap_is_clear_all(table_subscribers))
- {
- bitmap_intersect(&schema_subscribers,
- table_subscribers);
- updated= 1;
- }
- }
- mysql_mutex_unlock(&ndb_schema_share->mutex);
- mysql_mutex_unlock(&ndb_schema_share_mutex);
- /* end protect ndb_schema_share */
-
- if (updated)
- {
- bitmap_clear_bit(&schema_subscribers, node_id);
- /*
- if setting own acknowledge bit it is important that
- no other mysqld's are registred, as subsequent code
- will cause the original event to be hidden (by blob
- merge event code)
- */
- if (bitmap_is_clear_all(&schema_subscribers))
- bitmap_set_bit(&schema_subscribers, node_id);
- }
- else
- bitmap_clear_all(&schema_subscribers);
-
- if (ndb_schema_object)
- {
- mysql_mutex_lock(&ndb_schema_object->mutex);
- memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
- sizeof(ndb_schema_object->slock));
- mysql_mutex_unlock(&ndb_schema_object->mutex);
- }
-
- DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
- no_bytes_in_map(&schema_subscribers));
- DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
- bitmap_is_clear_all(&schema_subscribers)));
- }
-
- Ndb *ndb= thd_ndb->ndb;
- char save_db[FN_REFLEN];
- strcpy(save_db, ndb->getDatabaseName());
-
- char tmp_buf[FN_REFLEN];
- NDBDICT *dict= ndb->getDictionary();
- ndb->setDatabaseName(NDB_REP_DB);
- Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
- const NDBTAB *ndbtab= ndbtab_g.get_table();
- NdbTransaction *trans= 0;
- int retries= 100;
- int retry_sleep= 10; /* 10 milliseconds, transaction */
- const NDBCOL *col[SCHEMA_SIZE];
- unsigned sz[SCHEMA_SIZE];
-
- if (ndbtab == 0)
- {
- if (strcmp(NDB_REP_DB, db) != 0 ||
- strcmp(NDB_SCHEMA_TABLE, table_name))
- {
- ndb_error= &dict->getNdbError();
- }
- goto end;
- }
-
- {
- uint i;
- for (i= 0; i < SCHEMA_SIZE; i++)
- {
- col[i]= ndbtab->getColumn(i);
- if (i != SCHEMA_QUERY_I)
- {
- sz[i]= col[i]->getLength();
- DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
- }
- }
- }
-
- while (1)
- {
- const char *log_db= db;
- const char *log_tab= table_name;
- const char *log_subscribers= (char*)schema_subscribers.bitmap;
- uint32 log_type= (uint32)type;
- if ((trans= ndb->startTransaction()) == 0)
- goto err;
- while (1)
- {
- NdbOperation *op= 0;
- int r= 0;
- r|= (op= trans->getNdbOperation(ndbtab)) == 0;
- DBUG_ASSERT(r == 0);
- r|= op->writeTuple();
- DBUG_ASSERT(r == 0);
-
- /* db */
- ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
- r|= op->equal(SCHEMA_DB_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* name */
- ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
- strlen(log_tab));
- r|= op->equal(SCHEMA_NAME_I, tmp_buf);
- DBUG_ASSERT(r == 0);
- /* slock */
- DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
- r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
- DBUG_ASSERT(r == 0);
- /* query */
- {
- NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
- DBUG_ASSERT(ndb_blob != 0);
- uint blob_len= query_length;
- const char* blob_ptr= query;
- r|= ndb_blob->setValue(blob_ptr, blob_len);
- DBUG_ASSERT(r == 0);
- }
- /* node_id */
- r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
- DBUG_ASSERT(r == 0);
- /* epoch */
- r|= op->setValue(SCHEMA_EPOCH_I, epoch);
- DBUG_ASSERT(r == 0);
- /* id */
- r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
- DBUG_ASSERT(r == 0);
- /* version */
- r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
- DBUG_ASSERT(r == 0);
- /* type */
- r|= op->setValue(SCHEMA_TYPE_I, log_type);
- DBUG_ASSERT(r == 0);
- /* any value */
- if (!(thd->variables.option_bits & OPTION_BIN_LOG))
- r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
- else
- r|= op->setAnyValue(thd->server_id);
- DBUG_ASSERT(r == 0);
- if (log_db != new_db && new_db && new_table_name)
- {
- log_db= new_db;
- log_tab= new_table_name;
- log_subscribers= bitbuf_e; // no ack expected on this
- log_type= (uint32)SOT_RENAME_TABLE_NEW;
- continue;
- }
- break;
- }
- if (trans->execute(NdbTransaction::Commit) == 0)
- {
- DBUG_PRINT("info", ("logged: %s", query));
- break;
- }
-err:
- const NdbError *this_error= trans ?
- &trans->getNdbError() : &ndb->getNdbError();
- if (this_error->status == NdbError::TemporaryError)
- {
- if (retries--)
- {
- if (trans)
- ndb->closeTransaction(trans);
- my_sleep(retry_sleep);
- continue; // retry
- }
- }
- ndb_error= this_error;
- break;
- }
-end:
- if (ndb_error)
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- ndb_error->code,
- ndb_error->message,
- "Could not log query '%s' on other mysqld's");
-
- if (trans)
- ndb->closeTransaction(trans);
- ndb->setDatabaseName(save_db);
-
- /*
- Wait for other mysqld's to acknowledge the table operation
- */
- if (ndb_error == 0 &&
- !bitmap_is_clear_all(&schema_subscribers))
- {
- /*
- if own nodeid is set we are a single mysqld registred
- as an optimization we update the slock directly
- */
- if (bitmap_is_set(&schema_subscribers, node_id))
- ndbcluster_update_slock(thd, db, table_name);
- else
- dict->forceGCPWait();
-
- int max_timeout= DEFAULT_SYNC_TIMEOUT;
- mysql_mutex_lock(&ndb_schema_object->mutex);
- while (1)
- {
- struct timespec abstime;
- int i;
- int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
- set_timespec(abstime, 1);
- int ret= mysql_cond_timedwait(&injector_cond,
- &ndb_schema_object->mutex,
- &abstime);
- if (thd->killed)
- break;
-
- /* begin protect ndb_schema_share */
- mysql_mutex_lock(&ndb_schema_share_mutex);
- if (ndb_schema_share == 0)
- {
- mysql_mutex_unlock(&ndb_schema_share_mutex);
- break;
- }
- mysql_mutex_lock(&ndb_schema_share->mutex);
- for (i= 0; i < no_storage_nodes; i++)
- {
- /* remove any unsubscribed from schema_subscribers */
- MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
- if (!bitmap_is_clear_all(tmp))
- bitmap_intersect(&schema_subscribers, tmp);
- }
- mysql_mutex_unlock(&ndb_schema_share->mutex);
- mysql_mutex_unlock(&ndb_schema_share_mutex);
- /* end protect ndb_schema_share */
-
- /* remove any unsubscribed from ndb_schema_object->slock */
- bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
-
- DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
- (uchar*)ndb_schema_object->slock_bitmap.bitmap,
- no_bytes_in_map(&ndb_schema_object->slock_bitmap));
-
- if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
- break;
-
- if (ret)
- {
- max_timeout--;
- if (max_timeout == 0)
- {
- sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
- type_str, ndb_schema_object->key);
- break;
- }
- if (opt_ndb_extra_logging)
- ndb_report_waiting(type_str, max_timeout,
- "distributing", ndb_schema_object->key);
- }
- }
- mysql_mutex_unlock(&ndb_schema_object->mutex);
- }
-
- if (ndb_schema_object)
- ndb_free_schema_object(&ndb_schema_object, FALSE);
-
- DBUG_RETURN(0);
-}
-
-/*
- Handle _non_ data events from the storage nodes
-*/
-int
-ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
- NDB_SHARE *share)
-{
- DBUG_ENTER("ndb_handle_schema_change");
- TABLE* table= share->table;
- TABLE_SHARE *table_share= share->table_share;
- const char *dbname= table_share->db.str;
- const char *tabname= table_share->table_name.str;
- bool do_close_cached_tables= FALSE;
- bool is_online_alter_table= FALSE;
- bool is_rename_table= FALSE;
- bool is_remote_change=
- (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
-
- if (pOp->getEventType() == NDBEVENT::TE_ALTER)
- {
- if (pOp->tableFrmChanged())
- {
- DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
- is_online_alter_table= TRUE;
- }
- else
- {
- DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
- DBUG_ASSERT(pOp->tableNameChanged());
- is_rename_table= TRUE;
- }
- }
-
- {
- ndb->setDatabaseName(dbname);
- Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
- const NDBTAB *ev_tab= pOp->getTable();
- const NDBTAB *cache_tab= ndbtab_g.get_table();
- if (cache_tab &&
- cache_tab->getObjectId() == ev_tab->getObjectId() &&
- cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
- ndbtab_g.invalidate();
- }
-
- /*
- Refresh local frm file and dictionary cache if
- remote on-line alter table
- */
- if (is_remote_change && is_online_alter_table)
- {
- const char *tabname= table_share->table_name.str;
- char key[FN_REFLEN + 1];
- uchar *data= 0, *pack_data= 0;
- size_t length, pack_length;
- int error;
- NDBDICT *dict= ndb->getDictionary();
- const NDBTAB *altered_table= pOp->getTable();
-
- DBUG_PRINT("info", ("Detected frm change of table %s.%s",
- dbname, tabname));
- build_table_filename(key, FN_LEN - 1, dbname, tabname, NullS, 0);
- /*
- If the there is no local table shadowing the altered table and
- it has an frm that is different than the one on disk then
- overwrite it with the new table definition
- */
- if (!ndbcluster_check_if_local_table(dbname, tabname) &&
- readfrm(key, &data, &length) == 0 &&
- packfrm(data, length, &pack_data, &pack_length) == 0 &&
- cmp_frm(altered_table, pack_data, pack_length))
- {
- DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(),
- altered_table->getFrmLength());
- Ndb_table_guard ndbtab_g(dict, tabname);
- const NDBTAB *old= ndbtab_g.get_table();
- if (!old &&
- old->getObjectVersion() != altered_table->getObjectVersion())
- dict->putTable(altered_table);
-
- my_free(data);
- data= NULL;
- if ((error= unpackfrm(&data, &length,
- (const uchar*) altered_table->getFrmData())) ||
- (error= writefrm(key, data, length)))
- {
- sql_print_information("NDB: Failed write frm for %s.%s, error %d",
- dbname, tabname, error);
- }
-
- // copy names as memory will be freed
- NdbAutoPtr<char> a1((char *)(dbname= strdup(dbname)));
- NdbAutoPtr<char> a2((char *)(tabname= strdup(tabname)));
- ndbcluster_binlog_close_table(thd, share);
-
- TABLE_LIST table_list;
- bzero((char*) &table_list,sizeof(table_list));
- table_list.db= (char *)dbname;
- table_list.alias= table_list.table_name= (char *)tabname;
- close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
-
- if ((error= ndbcluster_binlog_open_table(thd, share,
- table_share, table, 1)))
- sql_print_information("NDB: Failed to re-open table %s.%s",
- dbname, tabname);
-
- table= share->table;
- table_share= share->table_share;
- dbname= table_share->db.str;
- tabname= table_share->table_name.str;
- }
- my_free(data);
- my_free(pack_data);
- }
-
- // If only frm was changed continue replicating
- if (is_online_alter_table)
- {
- /* Signal ha_ndbcluster::alter_table that drop is done */
- mysql_cond_signal(&injector_cond);
- DBUG_RETURN(0);
- }
-
- mysql_mutex_lock(&share->mutex);
- if (is_rename_table && !is_remote_change)
- {
- DBUG_PRINT("info", ("Detected name change of table %s.%s",
- share->db, share->table_name));
- /* ToDo: remove printout */
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
- share_prefix, share->table->s->db.str,
- share->table->s->table_name.str,
- share->key);
- {
- ndb->setDatabaseName(share->table->s->db.str);
- Ndb_table_guard ndbtab_g(ndb->getDictionary(),
- share->table->s->table_name.str);
- const NDBTAB *ev_tab= pOp->getTable();
- const NDBTAB *cache_tab= ndbtab_g.get_table();
- if (cache_tab &&
- cache_tab->getObjectId() == ev_tab->getObjectId() &&
- cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
- ndbtab_g.invalidate();
- }
- /* do the rename of the table in the share */
- share->table->s->db.str= share->db;
- share->table->s->db.length= strlen(share->db);
- share->table->s->table_name.str= share->table_name;
- share->table->s->table_name.length= strlen(share->table_name);
- }
- DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
- if (share->op_old == pOp)
- share->op_old= 0;
- else
- share->op= 0;
- // either just us or drop table handling as well
-
- /* Signal ha_ndbcluster::delete/rename_table that drop is done */
- mysql_mutex_unlock(&share->mutex);
- mysql_cond_signal(&injector_cond);
-
- mysql_mutex_lock(&ndbcluster_mutex);
- /* ndb_share reference binlog free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
- share->key, share->use_count));
- free_share(&share, TRUE);
- if (is_remote_change && share && share->state != NSS_DROPPED)
- {
- DBUG_PRINT("info", ("remote change"));
- share->state= NSS_DROPPED;
- if (share->use_count != 1)
- {
- /* open handler holding reference */
- /* wait with freeing create ndb_share to below */
- do_close_cached_tables= TRUE;
- }
- else
- {
- /* ndb_share reference create free */
- DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
- share->key, share->use_count));
- free_share(&share, TRUE);
- share= 0;
- }
- }
- else
- share= 0;
- mysql_mutex_unlock(&ndbcluster_mutex);
-
- pOp->setCustomData(0);
-
- mysql_mutex_lock(&injector_mutex);
- ndb->dropEventOperation(pOp);
- pOp= 0;
- mysql_mutex_unlock(&injector_mutex);
-
- if (do_close_cached_tables)
- {
- TABLE_LIST table_list;
- bzero((char*) &table_list,sizeof(table_list));
- table_list.db= (char *)dbname;
- table_list.alias= table_list.table_name= (char *)tabname;
- close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
- /* ndb_share reference create free */
- DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- }
- DBUG_RETURN(0);
-}
-
-static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
-{
- if (schema->any_value & NDB_ANYVALUE_RESERVED)
- {
- if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING)
- sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
- "query not logged",
- schema->any_value);
- return;
- }
- uint32 thd_server_id_save= thd->server_id;
- DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
- char *thd_db_save= thd->db;
- if (schema->any_value == 0)
- thd->server_id= ::server_id;
- else
- thd->server_id= schema->any_value;
- thd->db= schema->db;
- int errcode = query_error_code(thd, thd->killed == NOT_KILLED);
- thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
- schema->query_length, FALSE, TRUE,
- schema->name[0] == 0 || thd->db[0] == 0,
- errcode);
- thd->server_id= thd_server_id_save;
- thd->db= thd_db_save;
-}
-
-static int
-ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
- NdbEventOperation *pOp,
- List<Cluster_schema>
- *post_epoch_log_list,
- List<Cluster_schema>
- *post_epoch_unlock_list,
- MEM_ROOT *mem_root)
-{
- DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
- NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData();
- if (tmp_share && ndb_schema_share == tmp_share)
- {
- NDBEVENT::TableEvent ev_type= pOp->getEventType();
- DBUG_PRINT("enter", ("%s.%s ev_type: %d",
- tmp_share->db, tmp_share->table_name, ev_type));
- if (ev_type == NDBEVENT::TE_UPDATE ||
- ev_type == NDBEVENT::TE_INSERT)
- {
- Cluster_schema *schema= (Cluster_schema *)
- sql_alloc(sizeof(Cluster_schema));
- MY_BITMAP slock;
- my_bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
- uint node_id= g_ndb_cluster_connection->node_id();
- {
- ndbcluster_get_schema(tmp_share, schema);
- schema->any_value= pOp->getAnyValue();
- }
- enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
- DBUG_PRINT("info",
- ("%s.%s: log query_length: %d query: '%s' type: %d",
- schema->db, schema->name,
- schema->query_length, schema->query,
- schema_type));
- if (schema_type == SOT_CLEAR_SLOCK)
- {
- /*
- handle slock after epoch is completed to ensure that
- schema events get inserted in the binlog after any data
- events
- */
- post_epoch_log_list->push_back(schema, mem_root);
- DBUG_RETURN(0);
- }
- if (schema->node_id != node_id)
- {
- int log_query= 0, post_epoch_unlock= 0;
- switch (schema_type)
- {
- case SOT_DROP_TABLE:
- // fall through
- case SOT_RENAME_TABLE:
- // fall through
- case SOT_RENAME_TABLE_NEW:
- // fall through
- case SOT_ALTER_TABLE:
- post_epoch_log_list->push_back(schema, mem_root);
- /* acknowledge this query _after_ epoch completion */
- post_epoch_unlock= 1;
- break;
- case SOT_TRUNCATE_TABLE:
- {
- char key[FN_REFLEN + 1];
- build_table_filename(key, sizeof(key) - 1,
- schema->db, schema->name, "", 0);
- /* ndb_share reference temporary, free below */
- NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
- if (share)
- {
- DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
- share->key, share->use_count));
- }
- // invalidation already handled by binlog thread
- if (!share || !share->op)
- {
- {
- injector_ndb->setDatabaseName(schema->db);
- Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
- schema->name);
- ndbtab_g.invalidate();
- }
- TABLE_LIST table_list;
- bzero((char*) &table_list,sizeof(table_list));
- table_list.db= schema->db;
- table_list.alias= table_list.table_name= schema->name;
- close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
- }
- /* ndb_share reference temporary free */
- if (share)
- {
- DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- }
- }
- // fall through
- case SOT_CREATE_TABLE:
- if (ndbcluster_check_if_local_table(schema->db, schema->name))
- {
- DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
- schema->db, schema->name));
- sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
- "binlog schema event '%s' from node %d. ",
- schema->db, schema->name, schema->query,
- schema->node_id);
- }
- else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
- {
- print_could_not_discover_error(thd, schema);
- }
- log_query= 1;
- break;
- case SOT_DROP_DB:
- /* Drop the database locally if it only contains ndb tables */
- if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
- {
- const int no_print_error[1]= {0};
- run_query(thd, schema->query,
- schema->query + schema->query_length,
- no_print_error, /* print error */
- TRUE); /* don't binlog the query */
- /* binlog dropping database after any table operations */
- post_epoch_log_list->push_back(schema, mem_root);
- /* acknowledge this query _after_ epoch completion */
- post_epoch_unlock= 1;
- }
- else
- {
- /* Database contained local tables, leave it */
- sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
- "binlog schema event '%s' from node %d. ",
- schema->db, schema->query,
- schema->node_id);
- log_query= 1;
- }
- break;
- case SOT_CREATE_DB:
- /* fall through */
- case SOT_ALTER_DB:
- {
- const int no_print_error[1]= {0};
- run_query(thd, schema->query,
- schema->query + schema->query_length,
- no_print_error, /* print error */
- TRUE); /* don't binlog the query */
- log_query= 1;
- break;
- }
- case SOT_TABLESPACE:
- case SOT_LOGFILE_GROUP:
- log_query= 1;
- break;
- case SOT_CLEAR_SLOCK:
- abort();
- }
- if (log_query && ndb_binlog_running)
- ndb_binlog_query(thd, schema);
- /* signal that schema operation has been handled */
- DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
- if (bitmap_is_set(&slock, node_id))
- {
- if (post_epoch_unlock)
- post_epoch_unlock_list->push_back(schema, mem_root);
- else
- ndbcluster_update_slock(thd, schema->db, schema->name);
- }
- }
- DBUG_RETURN(0);
- }
- /*
- the normal case of UPDATE/INSERT has already been handled
- */
- switch (ev_type)
- {
- case NDBEVENT::TE_DELETE:
- // skip
- break;
- case NDBEVENT::TE_CLUSTER_FAILURE:
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
- ndb_schema_share->key, (unsigned) pOp->getGCI());
- // fall through
- case NDBEVENT::TE_DROP:
- if (opt_ndb_extra_logging &&
- ndb_binlog_tables_inited && ndb_binlog_running)
- sql_print_information("NDB Binlog: ndb tables initially "
- "read only on reconnect.");
-
- /* begin protect ndb_schema_share */
- mysql_mutex_lock(&ndb_schema_share_mutex);
- /* ndb_share reference binlog extra free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
- ndb_schema_share->key,
- ndb_schema_share->use_count));
- free_share(&ndb_schema_share);
- ndb_schema_share= 0;
- ndb_binlog_tables_inited= 0;
- mysql_mutex_unlock(&ndb_schema_share_mutex);
- /* end protect ndb_schema_share */
-
- close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
- // fall through
- case NDBEVENT::TE_ALTER:
- ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
- break;
- case NDBEVENT::TE_NODE_FAILURE:
- {
- uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
- DBUG_ASSERT(node_id != 0xFF);
- mysql_mutex_lock(&tmp_share->mutex);
- bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
- DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: Node: %d, down,"
- " Subscriber bitmask %x%x",
- pOp->getNdbdNodeId(),
- tmp_share->subscriber_bitmap[node_id].bitmap[1],
- tmp_share->subscriber_bitmap[node_id].bitmap[0]);
- }
- mysql_mutex_unlock(&tmp_share->mutex);
- mysql_cond_signal(&injector_cond);
- break;
- }
- case NDBEVENT::TE_SUBSCRIBE:
- {
- uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
- uint8 req_id= pOp->getReqNodeId();
- DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
- mysql_mutex_lock(&tmp_share->mutex);
- bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
- DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
- " Subscriber bitmask %x%x",
- pOp->getNdbdNodeId(),
- req_id,
- tmp_share->subscriber_bitmap[node_id].bitmap[1],
- tmp_share->subscriber_bitmap[node_id].bitmap[0]);
- }
- mysql_mutex_unlock(&tmp_share->mutex);
- mysql_cond_signal(&injector_cond);
- break;
- }
- case NDBEVENT::TE_UNSUBSCRIBE:
- {
- uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
- uint8 req_id= pOp->getReqNodeId();
- DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
- mysql_mutex_lock(&tmp_share->mutex);
- bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
- DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
- " Subscriber bitmask %x%x",
- pOp->getNdbdNodeId(),
- req_id,
- tmp_share->subscriber_bitmap[node_id].bitmap[1],
- tmp_share->subscriber_bitmap[node_id].bitmap[0]);
- }
- mysql_mutex_unlock(&tmp_share->mutex);
- mysql_cond_signal(&injector_cond);
- break;
- }
- default:
- sql_print_error("NDB Binlog: unknown non data event %d for %s. "
- "Ignoring...", (unsigned) ev_type, tmp_share->key);
- }
- }
- DBUG_RETURN(0);
-}
-
-/*
- process any operations that should be done after
- the epoch is complete
-*/
-static void
-ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
- List<Cluster_schema>
- *post_epoch_log_list,
- List<Cluster_schema>
- *post_epoch_unlock_list)
-{
- if (post_epoch_log_list->elements == 0)
- return;
- DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
- Cluster_schema *schema;
- while ((schema= post_epoch_log_list->pop()))
- {
- DBUG_PRINT("info",
- ("%s.%s: log query_length: %d query: '%s' type: %d",
- schema->db, schema->name,
- schema->query_length, schema->query,
- schema->type));
- int log_query= 0;
- {
- enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
- char key[FN_REFLEN + 1];
- build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
- if (schema_type == SOT_CLEAR_SLOCK)
- {
- mysql_mutex_lock(&ndbcluster_mutex);
- NDB_SCHEMA_OBJECT *ndb_schema_object=
- (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
- (uchar*) key, strlen(key));
- if (ndb_schema_object)
- {
- mysql_mutex_lock(&ndb_schema_object->mutex);
- memcpy(ndb_schema_object->slock, schema->slock,
- sizeof(ndb_schema_object->slock));
- DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
- (uchar*)ndb_schema_object->slock_bitmap.bitmap,
- no_bytes_in_map(&ndb_schema_object->slock_bitmap));
- mysql_mutex_unlock(&ndb_schema_object->mutex);
- mysql_cond_signal(&injector_cond);
- }
- mysql_mutex_unlock(&ndbcluster_mutex);
- continue;
- }
- /* ndb_share reference temporary, free below */
- NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
- if (share)
- {
- DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
- share->key, share->use_count));
- }
- switch (schema_type)
- {
- case SOT_DROP_DB:
- log_query= 1;
- break;
- case SOT_DROP_TABLE:
- log_query= 1;
- // invalidation already handled by binlog thread
- if (share && share->op)
- {
- break;
- }
- // fall through
- case SOT_RENAME_TABLE:
- // fall through
- case SOT_ALTER_TABLE:
- // invalidation already handled by binlog thread
- if (!share || !share->op)
- {
- {
- injector_ndb->setDatabaseName(schema->db);
- Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
- schema->name);
- ndbtab_g.invalidate();
- }
- TABLE_LIST table_list;
- bzero((char*) &table_list,sizeof(table_list));
- table_list.db= schema->db;
- table_list.alias= table_list.table_name= schema->name;
- close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
- }
- if (schema_type != SOT_ALTER_TABLE)
- break;
- // fall through
- case SOT_RENAME_TABLE_NEW:
- log_query= 1;
- if (ndb_binlog_running && (!share || !share->op))
- {
- /*
- we need to free any share here as command below
- may need to call handle_trailing_share
- */
- if (share)
- {
- /* ndb_share reference temporary free */
- DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- share= 0;
- }
- if (ndbcluster_check_if_local_table(schema->db, schema->name))
- {
- DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
- schema->db, schema->name));
- sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
- "binlog schema event '%s' from node %d. ",
- schema->db, schema->name, schema->query,
- schema->node_id);
- }
- else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
- {
- print_could_not_discover_error(thd, schema);
- }
- }
- break;
- default:
- DBUG_ASSERT(FALSE);
- }
- if (share)
- {
- /* ndb_share reference temporary free */
- DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- share= 0;
- }
- }
- if (ndb_binlog_running && log_query)
- ndb_binlog_query(thd, schema);
- }
- while ((schema= post_epoch_unlock_list->pop()))
- {
- ndbcluster_update_slock(thd, schema->db, schema->name);
- }
- DBUG_VOID_RETURN;
-}
-
-/*
- Timer class for doing performance measurements
-*/
-
-/*********************************************************************
- Internal helper functions for handeling of the cluster replication tables
- - ndb_binlog_index
- - ndb_apply_status
-*********************************************************************/
-
-/*
- struct to hold the data to be inserted into the
- ndb_binlog_index table
-*/
-struct ndb_binlog_index_row {
- ulonglong gci;
- const char *master_log_file;
- ulonglong master_log_pos;
- ulonglong n_inserts;
- ulonglong n_updates;
- ulonglong n_deletes;
- ulonglong n_schemaops;
-};
-
-/*
- Open the ndb_binlog_index table
-*/
-static int open_ndb_binlog_index(THD *thd, TABLE **ndb_binlog_index)
-{
- static char repdb[]= NDB_REP_DB;
- static char reptable[]= NDB_REP_TABLE;
- const char *save_proc_info= thd->proc_info;
- TABLE_LIST *tables= &binlog_tables;
-
- tables->init_one_table(repdb, strlen(repdb), reptable, strlen(reptable),
- reptable, TL_WRITE);
- thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
-
- tables->required_type= FRMTYPE_TABLE;
- thd->clear_error();
- if (open_and_lock_tables(thd, tables, FALSE, 0))
- {
- if (thd->killed)
- sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
- else
- sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
- thd->get_stmt_da()->sql_errno(),
- thd->get_stmt_da()->message());
- thd->proc_info= save_proc_info;
- return -1;
- }
- *ndb_binlog_index= tables->table;
- thd->proc_info= save_proc_info;
- (*ndb_binlog_index)->use_all_columns();
- return 0;
-}
-
-
-/*
- Insert one row in the ndb_binlog_index
-*/
-
-int ndb_add_ndb_binlog_index(THD *thd, void *_row)
-{
- ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
- int error= 0;
- /*
- Turn of binlogging to prevent the table changes to be written to
- the binary log.
- */
- ulong saved_options= thd->variables.option_bits;
- thd->variables.option_bits&= ~OPTION_BIN_LOG;
-
- if (!ndb_binlog_index && open_ndb_binlog_index(thd, &ndb_binlog_index))
- {
- sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
- error= -1;
- goto add_ndb_binlog_index_err;
- }
-
- /*
- Intialize ndb_binlog_index->record[0]
- */
- empty_record(ndb_binlog_index);
-
- ndb_binlog_index->field[0]->store(row.master_log_pos);
- ndb_binlog_index->field[1]->store(row.master_log_file,
- strlen(row.master_log_file),
- &my_charset_bin);
- ndb_binlog_index->field[2]->store(row.gci);
- ndb_binlog_index->field[3]->store(row.n_inserts);
- ndb_binlog_index->field[4]->store(row.n_updates);
- ndb_binlog_index->field[5]->store(row.n_deletes);
- ndb_binlog_index->field[6]->store(row.n_schemaops);
-
- if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
- {
- sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
- error= -1;
- goto add_ndb_binlog_index_err;
- }
-
-add_ndb_binlog_index_err:
- thd->get_stmt_da()->set_overwrite_status(true);
- thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
- thd->get_stmt_da()->set_overwrite_status(false);
- close_thread_tables(thd);
- /*
- There should be no need for rolling back transaction due to deadlock
- (since ndb_binlog_index is non transactional).
- */
- DBUG_ASSERT(! thd->transaction_rollback_request);
-
- thd->mdl_context.release_transactional_locks();
- ndb_binlog_index= 0;
- thd->variables.option_bits= saved_options;
- return error;
-}
-
-/*********************************************************************
- Functions for start, stop, wait for ndbcluster binlog thread
-*********************************************************************/
-
-enum Binlog_thread_state
-{
- BCCC_running= 0,
- BCCC_exit= 1,
- BCCC_restart= 2
-};
-
-static enum Binlog_thread_state do_ndbcluster_binlog_close_connection= BCCC_restart;
-
-int ndbcluster_binlog_start()
-{
- DBUG_ENTER("ndbcluster_binlog_start");
-
- if (::server_id == 0)
- {
- sql_print_warning("NDB: server id set to zero will cause any other mysqld "
- "with bin log to log with wrong server id");
- }
- else if (::server_id & 0x1 << 31)
- {
- sql_print_error("NDB: server id's with high bit set is reserved for internal "
- "purposes");
- DBUG_RETURN(-1);
- }
-
- mysql_mutex_init(key_injector_mutex, &injector_mutex, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_injector_cond, &injector_cond, NULL);
- mysql_mutex_init(key_ndb_schema_share_mutex,
- &ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
-
- /* Create injector thread */
- if (mysql_thread_create(key_thread_ndb_binlog,
- &ndb_binlog_thread, &connection_attrib,
- ndb_binlog_thread_func, 0))
- {
- DBUG_PRINT("error", ("Could not create ndb injector thread"));
- mysql_cond_destroy(&injector_cond);
- mysql_mutex_destroy(&injector_mutex);
- DBUG_RETURN(-1);
- }
-
- ndbcluster_binlog_inited= 1;
-
- /* Wait for the injector thread to start */
- mysql_mutex_lock(&injector_mutex);
- while (!ndb_binlog_thread_running)
- mysql_cond_wait(&injector_cond, &injector_mutex);
- mysql_mutex_unlock(&injector_mutex);
-
- if (ndb_binlog_thread_running < 0)
- DBUG_RETURN(-1);
-
- DBUG_RETURN(0);
-}
-
-
-/**************************************************************
- Internal helper functions for creating/dropping ndb events
- used by the client sql threads
-**************************************************************/
-void
-ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
-{
- event_name->set_ascii("REPL$", 5);
- event_name->append(db);
- if (tbl)
- {
- event_name->append('/');
- event_name->append(tbl);
- }
-}
-
-bool
-ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
-{
- char key[FN_REFLEN + 1];
- char ndb_file[FN_REFLEN + 1];
-
- DBUG_ENTER("ndbcluster_check_if_local_table");
- build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
- build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
- /* Check that any defined table is an ndb table */
- DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
- if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
- {
- DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
-
-
- DBUG_RETURN(true);
- }
-
- DBUG_RETURN(false);
-}
-
-bool
-ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
-{
- DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
- DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
- LEX_STRING *tabname;
- List<LEX_STRING> files;
- char path[FN_REFLEN + 1];
-
- build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
- if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
- {
- DBUG_PRINT("info", ("Failed to find files"));
- DBUG_RETURN(true);
- }
- DBUG_PRINT("info",("found: %d files", files.elements));
- while ((tabname= files.pop()))
- {
- DBUG_PRINT("info", ("Found table %s", tabname->str));
- if (ndbcluster_check_if_local_table(dbname, tabname->str))
- DBUG_RETURN(true);
- }
-
- DBUG_RETURN(false);
-}
-
-/*
- Common function for setting up everything for logging a table at
- create/discover.
-*/
-int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
- uint key_len,
- const char *db,
- const char *table_name,
- my_bool share_may_exist)
-{
- int do_event_op= ndb_binlog_running;
- DBUG_ENTER("ndbcluster_create_binlog_setup");
- DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
- key, key_len, db, table_name, share_may_exist));
- DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
- DBUG_ASSERT(strlen(key) == key_len);
-
- mysql_mutex_lock(&ndbcluster_mutex);
-
- /* Handle any trailing share */
- NDB_SHARE *share= (NDB_SHARE*) my_hash_search(&ndbcluster_open_tables,
- (uchar*) key, key_len);
-
- if (share && share_may_exist)
- {
- if (share->flags & NSF_NO_BINLOG ||
- share->op != 0 ||
- share->op_old != 0)
- {
- mysql_mutex_unlock(&ndbcluster_mutex);
- DBUG_RETURN(0); // replication already setup, or should not
- }
- }
-
- if (share)
- {
- if (share->op || share->op_old)
- {
- my_errno= HA_ERR_TABLE_EXIST;
- mysql_mutex_unlock(&ndbcluster_mutex);
- DBUG_RETURN(1);
- }
- if (!share_may_exist || share->connect_count !=
- g_ndb_cluster_connection->get_connect_count())
- {
- handle_trailing_share(share);
- share= NULL;
- }
- }
-
- /* Create share which is needed to hold replication information */
- if (share)
- {
- /* ndb_share reference create */
- ++share->use_count;
- DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
- share->key, share->use_count));
- }
- /* ndb_share reference create */
- else if (!(share= get_share(key, 0, TRUE, TRUE)))
- {
- sql_print_error("NDB Binlog: "
- "allocating table share for %s failed", key);
- }
- else
- {
- DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
- share->key, share->use_count));
- }
-
- if (!ndb_schema_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
- do_event_op= 1;
- else if (!ndb_apply_status_share &&
- strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
- do_event_op= 1;
-
- if (!do_event_op)
- {
- share->flags|= NSF_NO_BINLOG;
- mysql_mutex_unlock(&ndbcluster_mutex);
- DBUG_RETURN(0);
- }
- mysql_mutex_unlock(&ndbcluster_mutex);
-
- while (share && !IS_TMP_PREFIX(table_name))
- {
- /*
- ToDo make sanity check of share so that the table is actually the same
- I.e. we need to do open file from frm in this case
- Currently awaiting this to be fixed in the 4.1 tree in the general
- case
- */
-
- /* Create the event in NDB */
- ndb->setDatabaseName(db);
-
- NDBDICT *dict= ndb->getDictionary();
- Ndb_table_guard ndbtab_g(dict, table_name);
- const NDBTAB *ndbtab= ndbtab_g.get_table();
- if (ndbtab == 0)
- {
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
- "%s, %d", key, dict->getNdbError().message,
- dict->getNdbError().code);
- break; // error
- }
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name, db, table_name);
- /*
- event should have been created by someone else,
- but let's make sure, and create if it doesn't exist
- */
- const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
- if (!ev)
- {
- if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
- {
- sql_print_error("NDB Binlog: "
- "FAILED CREATE (DISCOVER) TABLE Event: %s",
- event_name.c_ptr());
- break; // error
- }
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: "
- "CREATE (DISCOVER) TABLE Event: %s",
- event_name.c_ptr());
- }
- else
- {
- delete ev;
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
- event_name.c_ptr());
- }
-
- /*
- create the event operations for receiving logging events
- */
- if (ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
- {
- sql_print_error("NDB Binlog:"
- "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
- event_name.c_ptr());
- /* a warning has been issued to the client */
- DBUG_RETURN(0);
- }
- DBUG_RETURN(0);
- }
- DBUG_RETURN(-1);
-}
-
-int
-ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
- const char *event_name, NDB_SHARE *share,
- int push_warning)
-{
- THD *thd= current_thd;
- DBUG_ENTER("ndbcluster_create_event");
- DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
- ndbtab->getName(), ndbtab->getObjectVersion(),
- event_name, share ? share->key : "(nil)"));
- DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
- if (!share)
- {
- DBUG_PRINT("info", ("share == NULL"));
- DBUG_RETURN(0);
- }
- if (share->flags & NSF_NO_BINLOG)
- {
- DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
- share->flags, share->flags & NSF_NO_BINLOG));
- DBUG_RETURN(0);
- }
-
- NDBDICT *dict= ndb->getDictionary();
- NDBEVENT my_event(event_name);
- my_event.setTable(*ndbtab);
- my_event.addTableEvent(NDBEVENT::TE_ALL);
- if (share->flags & NSF_HIDDEN_PK)
- {
- if (share->flags & NSF_BLOB_FLAG)
- {
- sql_print_error("NDB Binlog: logging of table %s "
- "with BLOB attribute and no PK is not supported",
- share->key);
- if (push_warning)
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_ILLEGAL_HA_CREATE_OPTION,
- ER(ER_ILLEGAL_HA_CREATE_OPTION),
- ndbcluster_hton_name,
- "Binlog of table with BLOB attribute and no PK");
-
- share->flags|= NSF_NO_BINLOG;
- DBUG_RETURN(-1);
- }
- /* No primary key, subscribe for all attributes */
- my_event.setReport(NDBEVENT::ER_ALL);
- DBUG_PRINT("info", ("subscription all"));
- }
- else
- {
- if (ndb_schema_share || strcmp(share->db, NDB_REP_DB) ||
- strcmp(share->table_name, NDB_SCHEMA_TABLE))
- {
- my_event.setReport(NDBEVENT::ER_UPDATED);
- DBUG_PRINT("info", ("subscription only updated"));
- }
- else
- {
- my_event.setReport((NDBEVENT::EventReport)
- (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE));
- DBUG_PRINT("info", ("subscription all and subscribe"));
- }
- }
- if (share->flags & NSF_BLOB_FLAG)
- my_event.mergeEvents(TRUE);
-
- /* add all columns to the event */
- int n_cols= ndbtab->getNoOfColumns();
- for(int a= 0; a < n_cols; a++)
- my_event.addEventColumn(a);
-
- if (dict->createEvent(my_event)) // Add event to database
- {
- if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
- {
- /*
- failed, print a warning
- */
- if (push_warning > 1)
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code,
- dict->getNdbError().message, "NDB");
- sql_print_error("NDB Binlog: Unable to create event in database. "
- "Event: %s Error Code: %d Message: %s", event_name,
- dict->getNdbError().code, dict->getNdbError().message);
- DBUG_RETURN(-1);
- }
-
- /*
- try retrieving the event, if table version/id matches, we will get
- a valid event. Otherwise we have a trailing event from before
- */
- const NDBEVENT *ev;
- if ((ev= dict->getEvent(event_name)))
- {
- delete ev;
- DBUG_RETURN(0);
- }
-
- /*
- trailing event from before; an error, but try to correct it
- */
- if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
- dict->dropEvent(my_event.getName()))
- {
- if (push_warning > 1)
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code,
- dict->getNdbError().message, "NDB");
- sql_print_error("NDB Binlog: Unable to create event in database. "
- " Attempt to correct with drop failed. "
- "Event: %s Error Code: %d Message: %s",
- event_name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- DBUG_RETURN(-1);
- }
-
- /*
- try to add the event again
- */
- if (dict->createEvent(my_event))
- {
- if (push_warning > 1)
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code,
- dict->getNdbError().message, "NDB");
- sql_print_error("NDB Binlog: Unable to create event in database. "
- " Attempt to correct with drop ok, but create failed. "
- "Event: %s Error Code: %d Message: %s",
- event_name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- DBUG_RETURN(-1);
- }
-#ifdef NDB_BINLOG_EXTRA_WARNINGS
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- 0, "NDB Binlog: Removed trailing event",
- "NDB");
-#endif
- }
-
- DBUG_RETURN(0);
-}
-
-inline int is_ndb_compatible_type(Field *field)
-{
- return
- !(field->flags & BLOB_FLAG) &&
- field->type() != MYSQL_TYPE_BIT &&
- field->pack_length() != 0;
-}
-
-/*
- - create eventOperations for receiving log events
- - setup ndb recattrs for reception of log event data
- - "start" the event operation
-
- used at create/discover of tables
-*/
-int
-ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
- const char *event_name)
-{
- THD *thd= current_thd;
- /*
- we are in either create table or rename table so table should be
- locked, hence we can work with the share without locks
- */
-
- DBUG_ENTER("ndbcluster_create_event_ops");
- DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
- DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
-
- DBUG_ASSERT(share != 0);
-
- if (share->flags & NSF_NO_BINLOG)
- {
- DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
- share->flags));
- DBUG_RETURN(0);
- }
-
- int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
- if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
- do_ndb_schema_share= 1;
- else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
- strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
- do_ndb_apply_status_share= 1;
- else if (!binlog_filter->db_ok(share->db) || !ndb_binlog_running)
- {
- share->flags|= NSF_NO_BINLOG;
- DBUG_RETURN(0);
- }
-
- if (share->op)
- {
- assert(share->op->getCustomData() == (void *) share);
-
- DBUG_ASSERT(share->use_count > 1);
- sql_print_error("NDB Binlog: discover reusing old ev op");
- /* ndb_share reference ToDo free */
- DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
- share->key, share->use_count));
- free_share(&share); // old event op already has reference
- DBUG_RETURN(0);
- }
-
- TABLE *table= share->table;
-
- int retries= 100;
- /*
- 100 milliseconds, temporary error on schema operation can
- take some time to be resolved
- */
- int retry_sleep= 100;
- while (1)
- {
- mysql_mutex_lock(&injector_mutex);
- Ndb *ndb= injector_ndb;
- if (do_ndb_schema_share)
- ndb= schema_ndb;
-
- if (ndb == 0)
- {
- mysql_mutex_unlock(&injector_mutex);
- DBUG_RETURN(-1);
- }
-
- NdbEventOperation* op;
- if (do_ndb_schema_share)
- op= ndb->createEventOperation(event_name);
- else
- {
- // set injector_ndb database/schema from table internal name
- int ret= ndb->setDatabaseAndSchemaName(ndbtab);
- assert(ret == 0);
- op= ndb->createEventOperation(event_name);
- // reset to catch errors
- ndb->setDatabaseName("");
- }
- if (!op)
- {
- sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
- " %s",event_name);
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- ndb->getNdbError().code,
- ndb->getNdbError().message,
- "NDB");
- mysql_mutex_unlock(&injector_mutex);
- DBUG_RETURN(-1);
- }
-
- if (share->flags & NSF_BLOB_FLAG)
- op->mergeEvents(TRUE); // currently not inherited from event
-
- DBUG_PRINT("info", ("share->ndb_value[0]: 0x%lx share->ndb_value[1]: 0x%lx",
- (long) share->ndb_value[0],
- (long) share->ndb_value[1]));
- int n_columns= ndbtab->getNoOfColumns();
- int n_fields= table ? table->s->fields : 0; // XXX ???
- for (int j= 0; j < n_columns; j++)
- {
- const char *col_name= ndbtab->getColumn(j)->getName();
- NdbValue attr0, attr1;
- if (j < n_fields)
- {
- Field *f= share->table->field[j];
- if (is_ndb_compatible_type(f))
- {
- DBUG_PRINT("info", ("%s compatible", col_name));
- attr0.rec= op->getValue(col_name, (char*) f->ptr);
- attr1.rec= op->getPreValue(col_name,
- (f->ptr - share->table->record[0]) +
- (char*) share->table->record[1]);
- }
- else if (! (f->flags & BLOB_FLAG))
- {
- DBUG_PRINT("info", ("%s non compatible", col_name));
- attr0.rec= op->getValue(col_name);
- attr1.rec= op->getPreValue(col_name);
- }
- else
- {
- DBUG_PRINT("info", ("%s blob", col_name));
- DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
- attr0.blob= op->getBlobHandle(col_name);
- attr1.blob= op->getPreBlobHandle(col_name);
- if (attr0.blob == NULL || attr1.blob == NULL)
- {
- sql_print_error("NDB Binlog: Creating NdbEventOperation"
- " blob field %u handles failed (code=%d) for %s",
- j, op->getNdbError().code, event_name);
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- op->getNdbError().code,
- op->getNdbError().message,
- "NDB");
- ndb->dropEventOperation(op);
- mysql_mutex_unlock(&injector_mutex);
- DBUG_RETURN(-1);
- }
- }
- }
- else
- {
- DBUG_PRINT("info", ("%s hidden key", col_name));
- attr0.rec= op->getValue(col_name);
- attr1.rec= op->getPreValue(col_name);
- }
- share->ndb_value[0][j].ptr= attr0.ptr;
- share->ndb_value[1][j].ptr= attr1.ptr;
- DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%lx "
- "share->ndb_value[0][%d]: 0x%lx",
- j, (long) &share->ndb_value[0][j],
- j, (long) attr0.ptr));
- DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%lx "
- "share->ndb_value[1][%d]: 0x%lx",
- j, (long) &share->ndb_value[0][j],
- j, (long) attr1.ptr));
- }
- op->setCustomData((void *) share); // set before execute
- share->op= op; // assign op in NDB_SHARE
- if (op->execute())
- {
- share->op= NULL;
- retries--;
- if (op->getNdbError().status != NdbError::TemporaryError &&
- op->getNdbError().code != 1407)
- retries= 0;
- if (retries == 0)
- {
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- op->getNdbError().code, op->getNdbError().message,
- "NDB");
- sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
- event_name,
- op->getNdbError().code, op->getNdbError().message);
- }
- ndb->dropEventOperation(op);
- mysql_mutex_unlock(&injector_mutex);
- if (retries)
- {
- my_sleep(retry_sleep);
- continue;
- }
- DBUG_RETURN(-1);
- }
- mysql_mutex_unlock(&injector_mutex);
- break;
- }
-
- /* ndb_share reference binlog */
- get_share(share);
- DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
- share->key, share->use_count));
- if (do_ndb_apply_status_share)
- {
- /* ndb_share reference binlog extra */
- ndb_apply_status_share= get_share(share);
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
- share->key, share->use_count));
- mysql_cond_signal(&injector_cond);
- }
- else if (do_ndb_schema_share)
- {
- /* ndb_share reference binlog extra */
- ndb_schema_share= get_share(share);
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
- share->key, share->use_count));
- mysql_cond_signal(&injector_cond);
- }
-
- DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
- share->key, (long) share->op, share->use_count));
-
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: logging %s", share->key);
- DBUG_RETURN(0);
-}
-
-/*
- when entering the calling thread should have a share lock id share != 0
- then the injector thread will have one as well, i.e. share->use_count == 0
- (unless it has already dropped... then share->op == 0)
-*/
-int
-ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
- NDB_SHARE *share, const char *type_str)
-{
- DBUG_ENTER("ndbcluster_handle_drop_table");
- THD *thd= current_thd;
-
- NDBDICT *dict= ndb->getDictionary();
- if (event_name && dict->dropEvent(event_name))
- {
- if (dict->getNdbError().code != 4710)
- {
- /* drop event failed for some reason, issue a warning */
- push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code,
- dict->getNdbError().message, "NDB");
- /* error is not that the event did not exist */
- sql_print_error("NDB Binlog: Unable to drop event in database. "
- "Event: %s Error Code: %d Message: %s",
- event_name,
- dict->getNdbError().code,
- dict->getNdbError().message);
- /* ToDo; handle error? */
- if (share && share->op &&
- share->op->getState() == NdbEventOperation::EO_EXECUTING &&
- dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
- {
- DBUG_ASSERT(FALSE);
- DBUG_RETURN(-1);
- }
- }
- }
-
- if (share == 0 || share->op == 0)
- {
- DBUG_RETURN(0);
- }
-
-/*
- Syncronized drop between client thread and injector thread is
- neccessary in order to maintain ordering in the binlog,
- such that the drop occurs _after_ any inserts/updates/deletes.
-
- The penalty for this is that the drop table becomes slow.
-
- This wait is however not strictly neccessary to produce a binlog
- that is usable. However the slave does not currently handle
- these out of order, thus we are keeping the SYNC_DROP_ defined
- for now.
-*/
- const char *save_proc_info= thd->proc_info;
-#define SYNC_DROP_
-#ifdef SYNC_DROP_
- thd->proc_info= "Syncing ndb table schema operation and binlog";
- mysql_mutex_lock(&share->mutex);
- int max_timeout= DEFAULT_SYNC_TIMEOUT;
- while (share->op)
- {
- struct timespec abstime;
- set_timespec(abstime, 1);
- int ret= mysql_cond_timedwait(&injector_cond,
- &share->mutex,
- &abstime);
- if (thd->killed ||
- share->op == 0)
- break;
- if (ret)
- {
- max_timeout--;
- if (max_timeout == 0)
- {
- sql_print_error("NDB %s: %s timed out. Ignoring...",
- type_str, share->key);
- break;
- }
- if (opt_ndb_extra_logging)
- ndb_report_waiting(type_str, max_timeout,
- type_str, share->key);
- }
- }
- mysql_mutex_unlock(&share->mutex);
-#else
- mysql_mutex_lock(&share->mutex);
- share->op_old= share->op;
- share->op= 0;
- mysql_mutex_unlock(&share->mutex);
-#endif
- thd->proc_info= save_proc_info;
-
- DBUG_RETURN(0);
-}
-
-
-/********************************************************************
- Internal helper functions for differentd events from the stoarage nodes
- used by the ndb injector thread
-********************************************************************/
-
-/*
- Handle error states on events from the storage nodes
-*/
-static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
- ndb_binlog_index_row &row)
-{
- NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
- DBUG_ENTER("ndb_binlog_thread_handle_error");
-
- int overrun= pOp->isOverrun();
- if (overrun)
- {
- /*
- ToDo: this error should rather clear the ndb_binlog_index...
- and continue
- */
- sql_print_error("NDB Binlog: Overrun in event buffer, "
- "this means we have dropped events. Cannot "
- "continue binlog for %s", share->key);
- pOp->clearError();
- DBUG_RETURN(-1);
- }
-
- if (!pOp->isConsistent())
- {
- /*
- ToDo: this error should rather clear the ndb_binlog_index...
- and continue
- */
- sql_print_error("NDB Binlog: Not Consistent. Cannot "
- "continue binlog for %s. Error code: %d"
- " Message: %s", share->key,
- pOp->getNdbError().code,
- pOp->getNdbError().message);
- pOp->clearError();
- DBUG_RETURN(-1);
- }
- sql_print_error("NDB Binlog: unhandled error %d for table %s",
- pOp->hasError(), share->key);
- pOp->clearError();
- DBUG_RETURN(0);
-}
-
-static int
-ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
- NdbEventOperation *pOp,
- ndb_binlog_index_row &row)
-{
- NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
- NDBEVENT::TableEvent type= pOp->getEventType();
-
- switch (type)
- {
- case NDBEVENT::TE_CLUSTER_FAILURE:
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
- share->key, (unsigned) pOp->getGCI());
- if (ndb_apply_status_share == share)
- {
- if (opt_ndb_extra_logging &&
- ndb_binlog_tables_inited && ndb_binlog_running)
- sql_print_information("NDB Binlog: ndb tables initially "
- "read only on reconnect.");
- /* ndb_share reference binlog extra free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
- share->key, share->use_count));
- free_share(&ndb_apply_status_share);
- ndb_apply_status_share= 0;
- ndb_binlog_tables_inited= 0;
- }
- DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
- "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
- "op_old: 0x%lx",
- share->key, (long) share, (long) pOp,
- (long) share->op, (long) share->op_old));
- break;
- case NDBEVENT::TE_DROP:
- if (ndb_apply_status_share == share)
- {
- if (opt_ndb_extra_logging &&
- ndb_binlog_tables_inited && ndb_binlog_running)
- sql_print_information("NDB Binlog: ndb tables initially "
- "read only on reconnect.");
- /* ndb_share reference binlog extra free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
- share->key, share->use_count));
- free_share(&ndb_apply_status_share);
- ndb_apply_status_share= 0;
- ndb_binlog_tables_inited= 0;
- }
- /* ToDo: remove printout */
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Binlog: drop table %s.", share->key);
- // fall through
- case NDBEVENT::TE_ALTER:
- row.n_schemaops++;
- DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
- "share op: 0x%lx op_old: 0x%lx",
- type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
- share->key, (long) share, (long) pOp,
- (long) share->op, (long) share->op_old));
- break;
- case NDBEVENT::TE_NODE_FAILURE:
- /* fall through */
- case NDBEVENT::TE_SUBSCRIBE:
- /* fall through */
- case NDBEVENT::TE_UNSUBSCRIBE:
- /* ignore */
- return 0;
- default:
- sql_print_error("NDB Binlog: unknown non data event %d for %s. "
- "Ignoring...", (unsigned) type, share->key);
- return 0;
- }
-
- ndb_handle_schema_change(thd, ndb, pOp, share);
- return 0;
-}
-
-/*
- Handle data events from the storage nodes
-*/
-static int
-ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
- ndb_binlog_index_row &row,
- injector::transaction &trans)
-{
- NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
- if (share == ndb_apply_status_share)
- return 0;
-
- uint32 originating_server_id= pOp->getAnyValue();
- if (originating_server_id == 0)
- originating_server_id= ::server_id;
- else if (originating_server_id & NDB_ANYVALUE_RESERVED)
- {
- if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING)
- sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
- "event not logged",
- originating_server_id);
- return 0;
- }
- else if (!g_ndb_log_slave_updates)
- {
- /*
- This event comes from a slave applier since it has an originating
- server id set. Since option to log slave updates is not set, skip it.
- */
- return 0;
- }
-
- TABLE *table= share->table;
- DBUG_ASSERT(trans.good());
- DBUG_ASSERT(table != 0);
-
- dbug_print_table("table", table);
-
- TABLE_SHARE *table_s= table->s;
- uint n_fields= table_s->fields;
- MY_BITMAP b;
- /* Potential buffer for the bitmap */
- uint32 bitbuf[128 / (sizeof(uint32) * 8)];
- my_bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
- n_fields, FALSE);
- bitmap_set_all(&b);
-
- /*
- row data is already in table->record[0]
- As we told the NdbEventOperation to do this
- (saves moving data about many times)
- */
-
- /*
- for now malloc/free blobs buffer each time
- TODO if possible share single permanent buffer with handlers
- */
- uchar* blobs_buffer[2] = { 0, 0 };
- uint blobs_buffer_size[2] = { 0, 0 };
-
- switch(pOp->getEventType())
- {
- case NDBEVENT::TE_INSERT:
- row.n_inserts++;
- DBUG_PRINT("info", ("INSERT INTO %s.%s",
- table_s->db.str, table_s->table_name.str));
- {
- if (share->flags & NSF_BLOB_FLAG)
- {
- my_ptrdiff_t ptrdiff= 0;
- int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[0],
- blobs_buffer[0],
- blobs_buffer_size[0],
- ptrdiff);
- DBUG_ASSERT(ret == 0);
- }
- ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
- int ret __attribute__((unused))= trans.write_row(originating_server_id,
- injector::transaction::table(table,
- TRUE),
- &b, n_fields, table->record[0]);
- DBUG_ASSERT(ret == 0);
- }
- break;
- case NDBEVENT::TE_DELETE:
- row.n_deletes++;
- DBUG_PRINT("info",("DELETE FROM %s.%s",
- table_s->db.str, table_s->table_name.str));
- {
- /*
- table->record[0] contains only the primary key in this case
- since we do not have an after image
- */
- int n;
- if (table->s->primary_key != MAX_KEY)
- n= 0; /*
- use the primary key only as it save time and space and
- it is the only thing needed to log the delete
- */
- else
- n= 1; /*
- we use the before values since we don't have a primary key
- since the mysql server does not handle the hidden primary
- key
- */
-
- if (share->flags & NSF_BLOB_FLAG)
- {
- my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
- int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[n],
- blobs_buffer[n],
- blobs_buffer_size[n],
- ptrdiff);
- DBUG_ASSERT(ret == 0);
- }
- ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
- DBUG_EXECUTE("info", print_records(table, table->record[n]););
- int ret __attribute__((unused))= trans.delete_row(originating_server_id,
- injector::transaction::table(table,
- TRUE),
- &b, n_fields, table->record[n]);
- DBUG_ASSERT(ret == 0);
- }
- break;
- case NDBEVENT::TE_UPDATE:
- row.n_updates++;
- DBUG_PRINT("info", ("UPDATE %s.%s",
- table_s->db.str, table_s->table_name.str));
- {
- if (share->flags & NSF_BLOB_FLAG)
- {
- my_ptrdiff_t ptrdiff= 0;
- int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[0],
- blobs_buffer[0],
- blobs_buffer_size[0],
- ptrdiff);
- DBUG_ASSERT(ret == 0);
- }
- ndb_unpack_record(table, share->ndb_value[0],
- &b, table->record[0]);
- DBUG_EXECUTE("info", print_records(table, table->record[0]););
- if (table->s->primary_key != MAX_KEY)
- {
- /*
- since table has a primary key, we can do a write
- using only after values
- */
- trans.write_row(originating_server_id,
- injector::transaction::table(table, TRUE),
- &b, n_fields, table->record[0]);// after values
- }
- else
- {
- /*
- mysql server cannot handle the ndb hidden key and
- therefore needs the before image as well
- */
- if (share->flags & NSF_BLOB_FLAG)
- {
- my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
- int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[1],
- blobs_buffer[1],
- blobs_buffer_size[1],
- ptrdiff);
- DBUG_ASSERT(ret == 0);
- }
- ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
- DBUG_EXECUTE("info", print_records(table, table->record[1]););
- int ret __attribute__((unused))= trans.update_row(originating_server_id,
- injector::transaction::table(table,
- TRUE),
- &b, n_fields,
- table->record[1], // before values
- table->record[0]);// after values
- DBUG_ASSERT(ret == 0);
- }
- }
- break;
- default:
- /* We should REALLY never get here. */
- DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
- break;
- }
-
- if (share->flags & NSF_BLOB_FLAG)
- {
- my_free(blobs_buffer[0]);
- my_free(blobs_buffer[1]);
- }
-
- return 0;
-}
-
-//#define RUN_NDB_BINLOG_TIMER
-#ifdef RUN_NDB_BINLOG_TIMER
-class Timer
-{
-public:
- Timer() { start(); }
- void start() { gettimeofday(&m_start, 0); }
- void stop() { gettimeofday(&m_stop, 0); }
- ulong elapsed_ms()
- {
- return (ulong)
- (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
- ((longlong) m_stop.tv_usec -
- (longlong) m_start.tv_usec + 999) / 1000);
- }
-private:
- struct timeval m_start,m_stop;
-};
-#endif
-
-/****************************************************************
- Injector thread main loop
-****************************************************************/
-
-static uchar *
-ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
- size_t *length,
- my_bool not_used __attribute__((unused)))
-{
- *length= schema_object->key_length;
- return (uchar*) schema_object->key;
-}
-
-static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
- my_bool create_if_not_exists,
- my_bool have_lock)
-{
- NDB_SCHEMA_OBJECT *ndb_schema_object;
- uint length= (uint) strlen(key);
- DBUG_ENTER("ndb_get_schema_object");
- DBUG_PRINT("enter", ("key: '%s'", key));
-
- if (!have_lock)
- mysql_mutex_lock(&ndbcluster_mutex);
- while (!(ndb_schema_object=
- (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
- (uchar*) key,
- length)))
- {
- if (!create_if_not_exists)
- {
- DBUG_PRINT("info", ("does not exist"));
- break;
- }
- if (!(ndb_schema_object=
- (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
- MYF(MY_WME | MY_ZEROFILL))))
- {
- DBUG_PRINT("info", ("malloc error"));
- break;
- }
- ndb_schema_object->key= (char *)(ndb_schema_object+1);
- memcpy(ndb_schema_object->key, key, length + 1);
- ndb_schema_object->key_length= length;
- if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
- {
- my_free(ndb_schema_object);
- break;
- }
- mysql_mutex_init(key_ndb_schema_object_mutex, &ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
- my_bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
- sizeof(ndb_schema_object->slock)*8, FALSE);
- bitmap_clear_all(&ndb_schema_object->slock_bitmap);
- break;
- }
- if (ndb_schema_object)
- {
- ndb_schema_object->use_count++;
- DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
- }
- if (!have_lock)
- mysql_mutex_unlock(&ndbcluster_mutex);
- DBUG_RETURN(ndb_schema_object);
-}
-
-
-static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
- bool have_lock)
-{
- DBUG_ENTER("ndb_free_schema_object");
- DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
- if (!have_lock)
- mysql_mutex_lock(&ndbcluster_mutex);
- if (!--(*ndb_schema_object)->use_count)
- {
- DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
- my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
- mysql_mutex_destroy(&(*ndb_schema_object)->mutex);
- my_free(*ndb_schema_object);
- *ndb_schema_object= 0;
- }
- else
- {
- DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
- }
- if (!have_lock)
- mysql_mutex_unlock(&ndbcluster_mutex);
- DBUG_VOID_RETURN;
-}
-
-extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
-extern ulong opt_ndb_report_thresh_binlog_mem_usage;
-
-pthread_handler_t ndb_binlog_thread_func(void *arg)
-{
- THD *thd; /* needs to be first for thread_stack */
- Ndb *i_ndb= 0;
- Ndb *s_ndb= 0;
- Thd_ndb *thd_ndb=0;
- int ndb_update_ndb_binlog_index= 1;
- injector *inj= injector::instance();
- uint incident_id= 0;
-
-#ifdef RUN_NDB_BINLOG_TIMER
- Timer main_timer;
-#endif
-
- mysql_mutex_lock(&injector_mutex);
- /*
- Set up the Thread
- */
- my_thread_init();
- DBUG_ENTER("ndb_binlog_thread");
-
- thd= new THD; /* note that contructor of THD uses DBUG_ */
- THD_CHECK_SENTRY(thd);
- thd->set_current_stmt_binlog_format_row();
-
- /* We need to set thd->thread_id before thd->store_globals, or it will
- set an invalid value for thd->variables.pseudo_thread_id.
- */
- mysql_mutex_lock(&LOCK_thread_count);
- thd->thread_id= thread_id++;
- mysql_mutex_unlock(&LOCK_thread_count);
-
- mysql_thread_set_psi_id(thd->thread_id);
-
- thd->thread_stack= (char*) &thd; /* remember where our stack is */
- if (thd->store_globals())
- {
- thd->cleanup();
- delete thd;
- ndb_binlog_thread_running= -1;
- mysql_mutex_unlock(&injector_mutex);
- mysql_cond_signal(&injector_cond);
-
- DBUG_LEAVE; // Must match DBUG_ENTER()
- my_thread_end();
- pthread_exit(0);
- return NULL; // Avoid compiler warnings
- }
-
- thd->init_for_queries();
- thd->command= COM_DAEMON;
- thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
- thd->main_security_ctx.host_or_ip= "";
- thd->client_capabilities= 0;
- my_net_init(&thd->net, 0, MYF(MY_THREAD_SPECIFIC));
- thd->main_security_ctx.master_access= ~0;
- thd->main_security_ctx.priv_user[0]= 0;
- /* Do not use user-supplied timeout value for system threads. */
- thd->variables.lock_wait_timeout= LONG_TIMEOUT;
-
- /*
- Set up ndb binlog
- */
- sql_print_information("Starting MySQL Cluster Binlog Thread");
-
- pthread_detach_this_thread();
- thd->real_id= pthread_self();
- mysql_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
- thd->lex->start_transaction_opt= 0;
-
- if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
- s_ndb->init())
- {
- sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
- ndb_binlog_thread_running= -1;
- mysql_mutex_unlock(&injector_mutex);
- mysql_cond_signal(&injector_cond);
- goto err;
- }
-
- // empty database
- if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
- i_ndb->init())
- {
- sql_print_error("NDB Binlog: Getting Ndb object failed");
- ndb_binlog_thread_running= -1;
- mysql_mutex_unlock(&injector_mutex);
- mysql_cond_signal(&injector_cond);
- goto err;
- }
-
- /* init hash for schema object distribution */
- (void) my_hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
- (my_hash_get_key)ndb_schema_objects_get_key, 0, 0);
-
- /*
- Expose global reference to our ndb object.
-
- Used by both sql client thread and binlog thread to interact
- with the storage
- mysql_mutex_lock(&injector_mutex);
- */
- injector_thd= thd;
- injector_ndb= i_ndb;
- p_latest_trans_gci=
- injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
- schema_ndb= s_ndb;
-
- if (opt_bin_log)
- {
- ndb_binlog_running= TRUE;
- }
-
- /* Thread start up completed */
- ndb_binlog_thread_running= 1;
- mysql_mutex_unlock(&injector_mutex);
- mysql_cond_signal(&injector_cond);
-
- /*
- wait for mysql server to start (so that the binlog is started
- and thus can receive the first GAP event)
- */
- mysql_mutex_lock(&LOCK_server_started);
- while (!mysqld_server_started)
- {
- struct timespec abstime;
- set_timespec(abstime, 1);
- mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
- &abstime);
- if (ndbcluster_terminating)
- {
- mysql_mutex_unlock(&LOCK_server_started);
- goto err;
- }
- }
- mysql_mutex_unlock(&LOCK_server_started);
-restart:
- /*
- Main NDB Injector loop
- */
- while (ndb_binlog_running)
- {
- /*
- check if it is the first log, if so we do not insert a GAP event
- as there is really no log to have a GAP in
- */
- if (incident_id == 0)
- {
- LOG_INFO log_info;
- mysql_bin_log.get_current_log(&log_info);
- int len= strlen(log_info.log_file_name);
- uint no= 0;
- if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
- no == 1)
- {
- /* this is the fist log, so skip GAP event */
- break;
- }
- }
-
- /*
- Always insert a GAP event as we cannot know what has happened
- in the cluster while not being connected.
- */
- LEX_STRING const msg[2]=
- {
- { C_STRING_WITH_LEN("mysqld startup") },
- { C_STRING_WITH_LEN("cluster disconnect")}
- };
- int error __attribute__((unused))=
- inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]);
- DBUG_ASSERT(!error);
- break;
- }
- incident_id= 1;
- {
- thd->proc_info= "Waiting for ndbcluster to start";
-
- mysql_mutex_lock(&injector_mutex);
- while (!ndb_schema_share ||
- (ndb_binlog_running && !ndb_apply_status_share))
- {
- /* ndb not connected yet */
- struct timespec abstime;
- set_timespec(abstime, 1);
- mysql_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
- if (ndbcluster_binlog_terminating)
- {
- mysql_mutex_unlock(&injector_mutex);
- goto err;
- }
- }
- mysql_mutex_unlock(&injector_mutex);
-
- if (thd_ndb == NULL)
- {
- DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
- if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
- {
- sql_print_error("Could not allocate Thd_ndb object");
- goto err;
- }
- set_thd_ndb(thd, thd_ndb);
- thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
- thd->query_id= 0; // to keep valgrind quiet
- }
- }
-
- {
- // wait for the first event
- thd->proc_info= "Waiting for first event from ndbcluster";
- int schema_res, res;
- Uint64 schema_gci;
- do
- {
- DBUG_PRINT("info", ("Waiting for the first event"));
-
- if (ndbcluster_binlog_terminating)
- goto err;
-
- schema_res= s_ndb->pollEvents(100, &schema_gci);
- } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
- if (ndb_binlog_running)
- {
- Uint64 gci= i_ndb->getLatestGCI();
- while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
- {
- if (ndbcluster_binlog_terminating)
- goto err;
- res= i_ndb->pollEvents(10, &gci);
- }
- if (gci > schema_gci)
- {
- schema_gci= gci;
- }
- }
- // now check that we have epochs consistant with what we had before the restart
- DBUG_PRINT("info", ("schema_res: %d schema_gci: %lu", schema_res,
- (long) schema_gci));
- {
- i_ndb->flushIncompleteEvents(schema_gci);
- s_ndb->flushIncompleteEvents(schema_gci);
- if (schema_gci < ndb_latest_handled_binlog_epoch)
- {
- sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
- "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
- "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
- (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci);
- *p_latest_trans_gci= 0;
- ndb_latest_handled_binlog_epoch= 0;
- ndb_latest_applied_binlog_epoch= 0;
- ndb_latest_received_binlog_epoch= 0;
- }
- else if (ndb_latest_applied_binlog_epoch > 0)
- {
- sql_print_warning("NDB Binlog: cluster has reconnected. "
- "Changes to the database that occured while "
- "disconnected will not be in the binlog");
- }
- if (opt_ndb_extra_logging)
- {
- sql_print_information("NDB Binlog: starting log at epoch %u",
- (unsigned)schema_gci);
- }
- }
- }
- {
- static char db[]= "";
- thd->db= db;
- }
- do_ndbcluster_binlog_close_connection= BCCC_running;
- for ( ; !((ndbcluster_binlog_terminating ||
- do_ndbcluster_binlog_close_connection) &&
- ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
- do_ndbcluster_binlog_close_connection != BCCC_restart; )
- {
-#ifndef DBUG_OFF
- if (do_ndbcluster_binlog_close_connection)
- {
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
- "ndb_latest_handled_binlog_epoch: %lu, "
- "*p_latest_trans_gci: %lu",
- do_ndbcluster_binlog_close_connection,
- (ulong) ndb_latest_handled_binlog_epoch,
- (ulong) *p_latest_trans_gci));
- }
-#endif
-#ifdef RUN_NDB_BINLOG_TIMER
- main_timer.stop();
- sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
- main_timer.start();
-#endif
-
- /*
- now we don't want any events before next gci is complete
- */
- thd->proc_info= "Waiting for event from ndbcluster";
- thd->set_time();
-
- /* wait for event or 1000 ms */
- Uint64 gci= 0, schema_gci;
- int res= 0, tot_poll_wait= 1000;
- if (ndb_binlog_running)
- {
- res= i_ndb->pollEvents(tot_poll_wait, &gci);
- tot_poll_wait= 0;
- }
- else
- {
- /*
- Just consume any events, not used if no binlogging
- e.g. node failure events
- */
- Uint64 tmp_gci;
- if (i_ndb->pollEvents(0, &tmp_gci))
- while (i_ndb->nextEvent())
- ;
- }
- int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
- ndb_latest_received_binlog_epoch= gci;
-
- while (gci > schema_gci && schema_res >= 0)
- {
- static char buf[64];
- thd->proc_info= "Waiting for schema epoch";
- my_snprintf(buf, sizeof(buf), "%s %u(%u)", thd->proc_info, (unsigned) schema_gci, (unsigned) gci);
- thd->proc_info= buf;
- schema_res= s_ndb->pollEvents(10, &schema_gci);
- }
-
- if ((ndbcluster_binlog_terminating ||
- do_ndbcluster_binlog_close_connection) &&
- (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
- !ndb_binlog_running))
- break; /* Shutting down server */
-
- if (ndb_binlog_index && ndb_binlog_index->s->has_old_version())
- {
- if (ndb_binlog_index->s->has_old_version())
- {
- trans_commit_stmt(thd);
- close_thread_tables(thd);
- thd->mdl_context.release_transactional_locks();
- ndb_binlog_index= 0;
- }
- }
-
- MEM_ROOT **root_ptr=
- my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
- MEM_ROOT *old_root= *root_ptr;
- MEM_ROOT mem_root;
- init_sql_alloc(&mem_root, 4096, 0, MYF(0));
- List<Cluster_schema> post_epoch_log_list;
- List<Cluster_schema> post_epoch_unlock_list;
- *root_ptr= &mem_root;
-
- if (unlikely(schema_res > 0))
- {
- thd->proc_info= "Processing events from schema table";
- s_ndb->
- setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
- s_ndb->
- setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
- NdbEventOperation *pOp= s_ndb->nextEvent();
- while (pOp != NULL)
- {
- if (!pOp->hasError())
- {
- ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
- &post_epoch_log_list,
- &post_epoch_unlock_list,
- &mem_root);
- DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
- s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
- "<empty>"));
- DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
- i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
- "<empty>"));
- if (i_ndb->getEventOperation() == NULL &&
- s_ndb->getEventOperation() == NULL &&
- do_ndbcluster_binlog_close_connection == BCCC_running)
- {
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
- do_ndbcluster_binlog_close_connection= BCCC_restart;
- if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
- {
- sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
- "as latest received epoch is %lu",
- (ulong) *p_latest_trans_gci,
- (ulong) ndb_latest_received_binlog_epoch);
- }
- }
- }
- else
- sql_print_error("NDB: error %lu (%s) on handling "
- "binlog schema event",
- (ulong) pOp->getNdbError().code,
- pOp->getNdbError().message);
- pOp= s_ndb->nextEvent();
- }
- }
-
- if (res > 0)
- {
- DBUG_PRINT("info", ("pollEvents res: %d", res));
- thd->proc_info= "Processing events";
- NdbEventOperation *pOp= i_ndb->nextEvent();
- ndb_binlog_index_row row;
- while (pOp != NULL)
- {
-#ifdef RUN_NDB_BINLOG_TIMER
- Timer gci_timer, write_timer;
- int event_count= 0;
- gci_timer.start();
-#endif
- gci= pOp->getGCI();
- DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci));
- // sometimes get TE_ALTER with invalid table
- DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
- ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
- DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
-
- /* initialize some variables for this epoch */
- g_ndb_log_slave_updates= opt_log_slave_updates;
- i_ndb->
- setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
- i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
-
- bzero((char*) &row, sizeof(row));
- thd->variables.character_set_client= &my_charset_latin1;
- injector::transaction trans;
- // pass table map before epoch
- {
- Uint32 iter= 0;
- const NdbEventOperation *gci_op;
- Uint32 event_types;
- while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
- != NULL)
- {
- NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
- DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
- (long) gci_op, (long) share, event_types));
- // workaround for interface returning TE_STOP events
- // which are normally filtered out below in the nextEvent loop
- if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
- {
- DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
- gci_op->getEvent()->getTable()->getName()));
- continue;
- }
- // this should not happen
- if (share == NULL || share->table == NULL)
- {
- DBUG_PRINT("info", ("no share or table %s!",
- gci_op->getEvent()->getTable()->getName()));
- continue;
- }
- if (share == ndb_apply_status_share)
- {
- // skip this table, it is handled specially
- continue;
- }
- TABLE *table= share->table;
-#ifndef DBUG_OFF
- const LEX_STRING &name= table->s->table_name;
-#endif
- if ((event_types & (NdbDictionary::Event::TE_INSERT |
- NdbDictionary::Event::TE_UPDATE |
- NdbDictionary::Event::TE_DELETE)) == 0)
- {
- DBUG_PRINT("info", ("skipping non data event table: %.*s",
- (int) name.length, name.str));
- continue;
- }
- if (!trans.good())
- {
- DBUG_PRINT("info",
- ("Found new data event, initializing transaction"));
- inj->new_trans(thd, &trans);
- }
- DBUG_PRINT("info", ("use_table: %.*s",
- (int) name.length, name.str));
- injector::transaction::table tbl(table, TRUE);
- int ret __attribute__((unused))= trans.use_table(::server_id, tbl);
- DBUG_ASSERT(ret == 0);
- }
- }
- if (trans.good())
- {
- if (ndb_apply_status_share)
- {
- TABLE *table= ndb_apply_status_share->table;
-
-#ifndef DBUG_OFF
- const LEX_STRING& name= table->s->table_name;
- DBUG_PRINT("info", ("use_table: %.*s",
- (int) name.length, name.str));
-#endif
- injector::transaction::table tbl(table, TRUE);
- int ret __attribute__((unused))= trans.use_table(::server_id, tbl);
- DBUG_ASSERT(ret == 0);
-
- /*
- Intialize table->record[0]
- */
- empty_record(table);
-
- table->field[0]->store((longlong)::server_id);
- table->field[1]->store((longlong)gci);
- table->field[2]->store("", 0, &my_charset_bin);
- table->field[3]->store((longlong)0);
- table->field[4]->store((longlong)0);
- trans.write_row(::server_id,
- injector::transaction::table(table, TRUE),
- &table->s->all_set, table->s->fields,
- table->record[0]);
- }
- else
- {
- sql_print_error("NDB: Could not get apply status share");
- }
- }
-#ifdef RUN_NDB_BINLOG_TIMER
- write_timer.start();
-#endif
- do
- {
-#ifdef RUN_NDB_BINLOG_TIMER
- event_count++;
-#endif
- if (pOp->hasError() &&
- ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
- goto err;
-
-#ifndef DBUG_OFF
- {
- NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
- DBUG_PRINT("info",
- ("EVENT TYPE: %d GCI: %ld last applied: %ld "
- "share: 0x%lx (%s.%s)", pOp->getEventType(),
- (long) gci,
- (long) ndb_latest_applied_binlog_epoch,
- (long) share,
- share ? share->db : "'NULL'",
- share ? share->table_name : "'NULL'"));
- DBUG_ASSERT(share != 0);
- }
- // assert that there is consistancy between gci op list
- // and event list
- {
- Uint32 iter= 0;
- const NdbEventOperation *gci_op;
- Uint32 event_types;
- while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
- != NULL)
- {
- if (gci_op == pOp)
- break;
- }
- DBUG_ASSERT(gci_op == pOp);
- DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
- }
-#endif
- if ((unsigned) pOp->getEventType() <
- (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
- ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
- else
- {
- // set injector_ndb database/schema from table internal name
- int ret __attribute__((unused))=
- i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
- DBUG_ASSERT(ret == 0);
- ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
- // reset to catch errors
- i_ndb->setDatabaseName("");
- DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
- s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
- "<empty>"));
- DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
- i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
- "<empty>"));
- if (i_ndb->getEventOperation() == NULL &&
- s_ndb->getEventOperation() == NULL &&
- do_ndbcluster_binlog_close_connection == BCCC_running)
- {
- DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
- do_ndbcluster_binlog_close_connection= BCCC_restart;
- if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
- {
- sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
- "as latest received epoch is %lu",
- (ulong) *p_latest_trans_gci,
- (ulong) ndb_latest_received_binlog_epoch);
- }
- }
- }
-
- pOp= i_ndb->nextEvent();
- } while (pOp && pOp->getGCI() == gci);
-
- /*
- note! pOp is not referring to an event in the next epoch
- or is == 0
- */
-#ifdef RUN_NDB_BINLOG_TIMER
- write_timer.stop();
-#endif
-
- if (trans.good())
- {
- //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
- thd->proc_info= "Committing events to binlog";
- injector::transaction::binlog_pos start= trans.start_pos();
- if (int r= trans.commit())
- {
- sql_print_error("NDB Binlog: "
- "Error during COMMIT of GCI. Error: %d",
- r);
- /* TODO: Further handling? */
- }
- row.gci= gci;
- row.master_log_file= start.file_name();
- row.master_log_pos= start.file_pos();
-
- DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
- if (ndb_update_ndb_binlog_index)
- ndb_add_ndb_binlog_index(thd, &row);
- ndb_latest_applied_binlog_epoch= gci;
- }
- ndb_latest_handled_binlog_epoch= gci;
-#ifdef RUN_NDB_BINLOG_TIMER
- gci_timer.stop();
- sql_print_information("gci %ld event_count %d write time "
- "%ld(%d e/s), total time %ld(%d e/s)",
- (ulong)gci, event_count,
- write_timer.elapsed_ms(),
- (1000*event_count) / write_timer.elapsed_ms(),
- gci_timer.elapsed_ms(),
- (1000*event_count) / gci_timer.elapsed_ms());
-#endif
- }
- }
-
- ndb_binlog_thread_handle_schema_event_post_epoch(thd,
- &post_epoch_log_list,
- &post_epoch_unlock_list);
- free_root(&mem_root, MYF(0));
- *root_ptr= old_root;
- ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
- }
- if (do_ndbcluster_binlog_close_connection == BCCC_restart)
- {
- ndb_binlog_tables_inited= FALSE;
- trans_commit_stmt(thd);
- close_thread_tables(thd);
- thd->mdl_context.release_transactional_locks();
- ndb_binlog_index= 0;
- goto restart;
- }
-err:
- sql_print_information("Stopping Cluster Binlog");
- DBUG_PRINT("info",("Shutting down cluster binlog thread"));
- thd->proc_info= "Shutting down";
- thd->get_stmt_da()->set_overwrite_status(true);
- thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
- thd->get_stmt_da()->set_overwrite_status(false);
- close_thread_tables(thd);
- thd->mdl_context.release_transactional_locks();
- mysql_mutex_lock(&injector_mutex);
- /* don't mess with the injector_ndb anymore from other threads */
- injector_thd= 0;
- injector_ndb= 0;
- p_latest_trans_gci= 0;
- schema_ndb= 0;
- mysql_mutex_unlock(&injector_mutex);
- thd->db= 0; // as not to try to free memory
-
- if (ndb_apply_status_share)
- {
- /* ndb_share reference binlog extra free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
- ndb_apply_status_share->key,
- ndb_apply_status_share->use_count));
- free_share(&ndb_apply_status_share);
- ndb_apply_status_share= 0;
- }
- if (ndb_schema_share)
- {
- /* begin protect ndb_schema_share */
- mysql_mutex_lock(&ndb_schema_share_mutex);
- /* ndb_share reference binlog extra free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
- ndb_schema_share->key,
- ndb_schema_share->use_count));
- free_share(&ndb_schema_share);
- ndb_schema_share= 0;
- ndb_binlog_tables_inited= 0;
- mysql_mutex_unlock(&ndb_schema_share_mutex);
- /* end protect ndb_schema_share */
- }
-
- /* remove all event operations */
- if (s_ndb)
- {
- NdbEventOperation *op;
- DBUG_PRINT("info",("removing all event operations"));
- while ((op= s_ndb->getEventOperation()))
- {
- DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
- DBUG_PRINT("info",("removing event operation on %s",
- op->getEvent()->getName()));
- NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
- DBUG_ASSERT(share != 0);
- DBUG_ASSERT(share->op == op ||
- share->op_old == op);
- share->op= share->op_old= 0;
- /* ndb_share reference binlog free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- s_ndb->dropEventOperation(op);
- }
- delete s_ndb;
- s_ndb= 0;
- }
- if (i_ndb)
- {
- NdbEventOperation *op;
- DBUG_PRINT("info",("removing all event operations"));
- while ((op= i_ndb->getEventOperation()))
- {
- DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
- DBUG_PRINT("info",("removing event operation on %s",
- op->getEvent()->getName()));
- NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
- DBUG_ASSERT(share != 0);
- DBUG_ASSERT(share->op == op ||
- share->op_old == op);
- share->op= share->op_old= 0;
- /* ndb_share reference binlog free */
- DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
- share->key, share->use_count));
- free_share(&share);
- i_ndb->dropEventOperation(op);
- }
- delete i_ndb;
- i_ndb= 0;
- }
-
- my_hash_free(&ndb_schema_objects);
-
- delete thd;
-
- ndb_binlog_thread_running= -1;
- ndb_binlog_running= FALSE;
- mysql_cond_signal(&injector_cond);
-
- DBUG_PRINT("exit", ("ndb_binlog_thread"));
-
- DBUG_LEAVE; // Must match DBUG_ENTER()
- my_thread_end();
- pthread_exit(0);
- return NULL; // Avoid compiler warnings
-}
-
-bool
-ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
- enum ha_stat_type stat_type)
-{
- char buf[IO_SIZE];
- uint buflen;
- ulonglong ndb_latest_epoch= 0;
- DBUG_ENTER("ndbcluster_show_status_binlog");
-
- mysql_mutex_lock(&injector_mutex);
- if (injector_ndb)
- {
- char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
- ndb_latest_epoch= injector_ndb->getLatestGCI();
- mysql_mutex_unlock(&injector_mutex);
-
- buflen=
- snprintf(buf, sizeof(buf),
- "latest_epoch=%s, "
- "latest_trans_epoch=%s, "
- "latest_received_binlog_epoch=%s, "
- "latest_handled_binlog_epoch=%s, "
- "latest_applied_binlog_epoch=%s",
- llstr(ndb_latest_epoch, buff1),
- llstr(*p_latest_trans_gci, buff2),
- llstr(ndb_latest_received_binlog_epoch, buff3),
- llstr(ndb_latest_handled_binlog_epoch, buff4),
- llstr(ndb_latest_applied_binlog_epoch, buff5));
- if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
- "binlog", strlen("binlog"),
- buf, buflen))
- DBUG_RETURN(TRUE);
- }
- else
- mysql_mutex_unlock(&injector_mutex);
- DBUG_RETURN(FALSE);
-}
-
-#endif /* HAVE_NDB_BINLOG */
-#endif