diff options
author | unknown <pekka@mysql.com> | 2006-01-25 22:22:50 +0100 |
---|---|---|
committer | unknown <pekka@mysql.com> | 2006-01-25 22:22:50 +0100 |
commit | 465960c2e3a5c9bf69e4ac25aec56081ec4edc4a (patch) | |
tree | 0ef14e598480fb350ac12445849e4273462e64c1 /sql | |
parent | 97c6ff7bf2d1d85c3e835ff924aec58a77029287 (diff) | |
download | mariadb-git-465960c2e3a5c9bf69e4ac25aec56081ec4edc4a.tar.gz |
ndb - wl#2972 rbr blobs: write blob data to binlog
mysql-test/t/disabled.def:
rbr blobs: write data + dict cache workarounds
sql/ha_ndbcluster.cc:
rbr blobs: write data + dict cache workarounds
sql/ha_ndbcluster.h:
rbr blobs: write data + dict cache workarounds
sql/ha_ndbcluster_binlog.cc:
rbr blobs: write data + dict cache workarounds
storage/ndb/include/ndbapi/NdbDictionary.hpp:
rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbBlob.cpp:
rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbDictionary.cpp:
rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp:
rbr blobs: write data + dict cache workarounds
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_ndbcluster.cc | 103 | ||||
-rw-r--r-- | sql/ha_ndbcluster.h | 8 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 142 |
3 files changed, 188 insertions, 65 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 3d44c731b80..16b37ede164 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -35,6 +35,11 @@ #include "ha_ndbcluster_binlog.h" +#ifdef ndb_dynamite +#undef assert +#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) +#endif + // options from from mysqld.cc extern my_bool opt_ndb_optimized_node_selection; extern const char *opt_ndbcluster_connectstring; @@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) if (ndb_blob->blobsNextBlob() != NULL) DBUG_RETURN(0); ha_ndbcluster *ha= (ha_ndbcluster *)arg; - DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); + int ret= get_ndb_blobs_value(ha->table, ha->m_value, + ha->m_blobs_buffer, ha->m_blobs_buffer_size, + 0); + DBUG_RETURN(ret); } -int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) +/* + This routine is shared by injector. There is no common blobs buffer + so the buffer and length are passed by reference. Injector also + passes a record pointer diff. + */ +int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, + byte*& buffer, uint& buffer_size, + my_ptrdiff_t ptrdiff) { DBUG_ENTER("get_ndb_blobs_value"); @@ -803,44 +818,51 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) for (int loop= 0; loop <= 1; loop++) { uint32 offset= 0; - for (uint i= 0; i < table_share->fields; i++) + for (uint i= 0; i < table->s->fields; i++) { Field *field= table->field[i]; - NdbValue value= m_value[i]; + NdbValue value= value_array[i]; if (value.ptr != NULL && (field->flags & BLOB_FLAG)) { Field_blob *field_blob= (Field_blob *)field; NdbBlob *ndb_blob= value.blob; - Uint64 blob_len= 0; - if (ndb_blob->getLength(blob_len) != 0) - DBUG_RETURN(-1); - // Align to Uint64 - uint32 blob_size= blob_len; - if (blob_size % 8 != 0) - blob_size+= 8 - blob_size % 8; - if (loop == 1) - { - char *buf= m_blobs_buffer + offset; - uint32 len= 0xffffffff; // Max uint32 - DBUG_PRINT("value", ("read blob ptr=%lx len=%u", - buf, (uint) blob_len)); - if (ndb_blob->readData(buf, len) != 0) + int isNull; + ndb_blob->getDefined(isNull); + if (isNull == 0) { // XXX -1 should be allowed only for events + Uint64 blob_len= 0; + if (ndb_blob->getLength(blob_len) != 0) DBUG_RETURN(-1); - DBUG_ASSERT(len == blob_len); - field_blob->set_ptr(len, buf); + // Align to Uint64 + uint32 blob_size= blob_len; + if (blob_size % 8 != 0) + blob_size+= 8 - blob_size % 8; + if (loop == 1) + { + char *buf= buffer + offset; + uint32 len= 0xffffffff; // Max uint32 + DBUG_PRINT("info", ("read blob ptr=%p len=%u", + buf, (uint) blob_len)); + if (ndb_blob->readData(buf, len) != 0) + DBUG_RETURN(-1); + DBUG_ASSERT(len == blob_len); + // Ugly hack assumes only ptr needs to be changed + field_blob->ptr += ptrdiff; + field_blob->set_ptr(len, buf); + field_blob->ptr -= ptrdiff; + } + offset+= blob_size; } - offset+= blob_size; } } - if (loop == 0 && offset > m_blobs_buffer_size) + if (loop == 0 && offset > buffer_size) { - my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); - m_blobs_buffer_size= 0; - DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); - m_blobs_buffer= my_malloc(offset, MYF(MY_WME)); - if (m_blobs_buffer == NULL) + my_free(buffer, MYF(MY_ALLOW_ZERO_PTR)); + buffer_size= 0; + DBUG_PRINT("info", ("allocate blobs buffer size %u", offset)); + buffer= my_malloc(offset, MYF(MY_WME)); + if (buffer == NULL) DBUG_RETURN(-1); - m_blobs_buffer_size= offset; + buffer_size= offset; } } DBUG_RETURN(0); @@ -2713,14 +2735,22 @@ void ndb_unpack_record(TABLE *table, NdbValue *value, else { NdbBlob *ndb_blob= (*value).blob; - bool isNull= TRUE; -#ifndef DBUG_OFF - int ret= -#endif - ndb_blob->getNull(isNull); - DBUG_ASSERT(ret == 0); - if (isNull) - field->set_null(row_offset); + int isNull; + ndb_blob->getDefined(isNull); + if (isNull != 0) + { + uint col_no = ndb_blob->getColumn()->getColumnNo(); + if (isNull == 1) + { + DBUG_PRINT("info",("[%u] NULL", col_no)) + field->set_null(row_offset); + } + else + { + DBUG_PRINT("info",("[%u] UNDEFINED", col_no)); + bitmap_clear_bit(defined, col_no); + } + } } } } @@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to) NDBDICT *dict= ndb->getDictionary(); const NDBTAB *orig_tab= (const NDBTAB *) m_table; DBUG_ENTER("alter_table_name"); + DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to)); NdbDictionary::Table new_tab= *orig_tab; new_tab.setName(to); diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index 73b1b27ede2..a62356d41ab 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -25,6 +25,9 @@ #pragma interface /* gcc class implementation */ #endif +/* Blob tables and events are internal to NDB and must never be accessed */ +#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB") + #include <NdbApi.hpp> #include <ndbapi_limits.h> @@ -78,6 +81,10 @@ typedef struct ndb_index_data { typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; +int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, + byte*& buffer, uint& buffer_size, + my_ptrdiff_t ptrdiff); + typedef enum { NSS_INITIAL= 0, NSS_DROPPED, @@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share { #ifdef HAVE_NDB_BINLOG /* NDB_SHARE.flags */ #define NSF_HIDDEN_PK 1 /* table has hidden primary key */ +#define NSF_BLOB_FLAG 2 /* table has blob attributes */ #define NSF_NO_BINLOG 4 /* table should not be binlogged */ #endif diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index f1da21a3ad5..1b3833fe443 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -23,6 +23,11 @@ #include "slave.h" #include "ha_ndbcluster_binlog.h" +#ifdef ndb_dynamite +#undef assert +#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) +#endif + /* defines for cluster replication table names */ @@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) DBUG_ASSERT(_table != 0); if (_table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; + if (_table->s->blob_fields != 0) + share->flags|= NSF_BLOB_FLAG; return; } while (1) @@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) } if (table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; + if (table->s->blob_fields != 0) + share->flags|= NSF_BLOB_FLAG; break; } } @@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, NDB_SHARE *share) { DBUG_ENTER("ndbcluster_create_binlog_setup"); + DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); pthread_mutex_lock(&ndbcluster_mutex); @@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, const char *event_name, NDB_SHARE *share) { DBUG_ENTER("ndbcluster_create_event"); + DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s", + ndbtab->getName(), ndbtab->getObjectVersion(), + event_name, share ? share->key : "(nil)")); + DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); if (!share) { DBUG_PRINT("info", ("share == NULL")); @@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, my_event.addTableEvent(NDBEVENT::TE_ALL); if (share->flags & NSF_HIDDEN_PK) { - /* No primary key, susbscribe for all attributes */ + if (share->flags & NSF_BLOB_FLAG) + { + sql_print_error("NDB Binlog: logging of table %s " + "with no PK and blob attributes is not supported", + share->key); + DBUG_RETURN(-1); + } + /* No primary key, subscribe for all attributes */ my_event.setReport(NDBEVENT::ER_ALL); DBUG_PRINT("info", ("subscription all")); } @@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, DBUG_PRINT("info", ("subscription all and subscribe")); } } + if (share->flags & NSF_BLOB_FLAG) + my_event.mergeEvents(true); /* add all columns to the event */ int n_cols= ndbtab->getNoOfColumns(); @@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ DBUG_ENTER("ndbcluster_create_event_ops"); + DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); DBUG_ASSERT(share != 0); @@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } TABLE *table= share->table; - if (table) - { - /* - Logging of blob tables is not yet implemented, it would require: - 1. setup of events also on the blob attribute tables - 2. collect the pieces of the blob into one from an epoch to - provide a full blob to binlog - */ - if (table->s->blob_fields) - { - sql_print_error("NDB Binlog: logging of blob table %s " - "is not supported", share->key); - share->flags|= NSF_NO_BINLOG; - DBUG_RETURN(0); - } - } int do_schema_share= 0, do_apply_status_share= 0; int retries= 100; @@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, DBUG_RETURN(-1); } + if (share->flags & NSF_BLOB_FLAG) + op->mergeEvents(true); // currently not inherited from event + + if (share->flags & NSF_BLOB_FLAG) + { + /* + * Given servers S1 S2, following results in out-of-date + * event->m_tableImpl and column->m_blobTable. + * + * S1: create table t1(a int primary key); + * S2: drop table t1; + * S1: create table t2(a int primary key, b blob); + * S1: alter table t2 add x int; + * S1: alter table t2 drop x; + * + * TODO fix at right place before we get here + */ + ndb->getDictionary()->fix_blob_events(ndbtab, event_name); + } + int n_columns= ndbtab->getNoOfColumns(); - int n_fields= table ? table->s->fields : 0; + int n_fields= table ? table->s->fields : 0; // XXX ??? for (int j= 0; j < n_columns; j++) { const char *col_name= ndbtab->getColumn(j)->getName(); - NdbRecAttr *attr0, *attr1; + NdbValue attr0, attr1; if (j < n_fields) { Field *f= share->table->field[j]; if (is_ndb_compatible_type(f)) { DBUG_PRINT("info", ("%s compatible", col_name)); - attr0= op->getValue(col_name, f->ptr); - attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) + + attr0.rec= op->getValue(col_name, f->ptr); + attr1.rec= op->getPreValue(col_name, + (f->ptr - share->table->record[0]) + share->table->record[1]); } - else + else if (! (f->flags & BLOB_FLAG)) { DBUG_PRINT("info", ("%s non compatible", col_name)); - attr0= op->getValue(col_name); - attr1= op->getPreValue(col_name); + attr0.rec= op->getValue(col_name); + attr1.rec= op->getPreValue(col_name); + } + else + { + DBUG_PRINT("info", ("%s blob", col_name)); + attr0.blob= op->getBlobHandle(col_name); + attr1.blob= op->getPreBlobHandle(col_name); } } else { DBUG_PRINT("info", ("%s hidden key", col_name)); - attr0= op->getValue(col_name); - attr1= op->getPreValue(col_name); + attr0.rec= op->getValue(col_name); + attr1.rec= op->getPreValue(col_name); } - share->ndb_value[0][j].rec= attr0; - share->ndb_value[1][j].rec= attr1; + share->ndb_value[0][j].ptr= attr0.ptr; + share->ndb_value[1][j].ptr= attr1.ptr; } op->setCustomData((void *) share); // set before execute share->op= op; // assign op in NDB_SHARE @@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, (saves moving data about many times) */ + /* + for now malloc/free blobs buffer each time + TODO if possible share single permanent buffer with handlers + */ + byte* blobs_buffer[2] = { 0, 0 }; + uint blobs_buffer_size[2] = { 0, 0 }; + switch(pOp->getEventType()) { case NDBEVENT::TE_INSERT: row.n_inserts++; DBUG_PRINT("info", ("INSERT INTO %s", share->key)); { + if (share->flags & NSF_BLOB_FLAG) + { + my_ptrdiff_t ptrdiff= 0; + int ret= get_ndb_blobs_value(table, share->ndb_value[0], + blobs_buffer[0], blobs_buffer_size[0], + ptrdiff); + DBUG_ASSERT(ret == 0); + } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); trans.write_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[0]); @@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, key */ + if (share->flags & NSF_BLOB_FLAG) + { + my_ptrdiff_t ptrdiff= table->record[n] - table->record[0]; + int ret= get_ndb_blobs_value(table, share->ndb_value[n], + blobs_buffer[n], blobs_buffer_size[n], + ptrdiff); + DBUG_ASSERT(ret == 0); + } ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); print_records(table, table->record[n]); trans.delete_row(::server_id, injector::transaction::table(table, true), @@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, row.n_updates++; DBUG_PRINT("info", ("UPDATE %s", share->key)); { + if (share->flags & NSF_BLOB_FLAG) + { + my_ptrdiff_t ptrdiff= 0; + int ret= get_ndb_blobs_value(table, share->ndb_value[0], + blobs_buffer[0], blobs_buffer_size[0], + ptrdiff); + DBUG_ASSERT(ret == 0); + } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); print_records(table, table->record[0]); if (table->s->primary_key != MAX_KEY) { /* - since table has a primary key, we can to a write + since table has a primary key, we can do a write using only after values */ trans.write_row(::server_id, injector::transaction::table(table, true), @@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, mysql server cannot handle the ndb hidden key and therefore needs the before image as well */ + if (share->flags & NSF_BLOB_FLAG) + { + my_ptrdiff_t ptrdiff= table->record[1] - table->record[0]; + int ret= get_ndb_blobs_value(table, share->ndb_value[1], + blobs_buffer[1], blobs_buffer_size[1], + ptrdiff); + DBUG_ASSERT(ret == 0); + } ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]); print_records(table, table->record[1]); trans.update_row(::server_id, @@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, break; } + if (share->flags & NSF_BLOB_FLAG) + { + my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR)); + my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR)); + } + return 0; } @@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Binlog_index_row row; while (pOp != NULL) { + // sometimes get TE_ALTER with invalid table + DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER || + ! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName())); ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); @@ -2684,6 +2767,7 @@ err: DBUG_PRINT("info",("removing all event operations")); while ((op= ndb->getEventOperation())) { + DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); |