summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2006-01-25 22:22:50 +0100
committerunknown <pekka@mysql.com>2006-01-25 22:22:50 +0100
commit465960c2e3a5c9bf69e4ac25aec56081ec4edc4a (patch)
tree0ef14e598480fb350ac12445849e4273462e64c1 /sql
parent97c6ff7bf2d1d85c3e835ff924aec58a77029287 (diff)
downloadmariadb-git-465960c2e3a5c9bf69e4ac25aec56081ec4edc4a.tar.gz
ndb - wl#2972 rbr blobs: write blob data to binlog
mysql-test/t/disabled.def: rbr blobs: write data + dict cache workarounds sql/ha_ndbcluster.cc: rbr blobs: write data + dict cache workarounds sql/ha_ndbcluster.h: rbr blobs: write data + dict cache workarounds sql/ha_ndbcluster_binlog.cc: rbr blobs: write data + dict cache workarounds storage/ndb/include/ndbapi/NdbDictionary.hpp: rbr blobs: write data + dict cache workarounds storage/ndb/src/ndbapi/NdbBlob.cpp: rbr blobs: write data + dict cache workarounds storage/ndb/src/ndbapi/NdbDictionary.cpp: rbr blobs: write data + dict cache workarounds storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp: rbr blobs: write data + dict cache workarounds storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp: rbr blobs: write data + dict cache workarounds
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_ndbcluster.cc103
-rw-r--r--sql/ha_ndbcluster.h8
-rw-r--r--sql/ha_ndbcluster_binlog.cc142
3 files changed, 188 insertions, 65 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 3d44c731b80..16b37ede164 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -35,6 +35,11 @@
#include "ha_ndbcluster_binlog.h"
+#ifdef ndb_dynamite
+#undef assert
+#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
+#endif
+
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring;
@@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
if (ndb_blob->blobsNextBlob() != NULL)
DBUG_RETURN(0);
ha_ndbcluster *ha= (ha_ndbcluster *)arg;
- DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
+ int ret= get_ndb_blobs_value(ha->table, ha->m_value,
+ ha->m_blobs_buffer, ha->m_blobs_buffer_size,
+ 0);
+ DBUG_RETURN(ret);
}
-int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
+/*
+ This routine is shared by injector. There is no common blobs buffer
+ so the buffer and length are passed by reference. Injector also
+ passes a record pointer diff.
+ */
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+ byte*& buffer, uint& buffer_size,
+ my_ptrdiff_t ptrdiff)
{
DBUG_ENTER("get_ndb_blobs_value");
@@ -803,44 +818,51 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
for (int loop= 0; loop <= 1; loop++)
{
uint32 offset= 0;
- for (uint i= 0; i < table_share->fields; i++)
+ for (uint i= 0; i < table->s->fields; i++)
{
Field *field= table->field[i];
- NdbValue value= m_value[i];
+ NdbValue value= value_array[i];
if (value.ptr != NULL && (field->flags & BLOB_FLAG))
{
Field_blob *field_blob= (Field_blob *)field;
NdbBlob *ndb_blob= value.blob;
- Uint64 blob_len= 0;
- if (ndb_blob->getLength(blob_len) != 0)
- DBUG_RETURN(-1);
- // Align to Uint64
- uint32 blob_size= blob_len;
- if (blob_size % 8 != 0)
- blob_size+= 8 - blob_size % 8;
- if (loop == 1)
- {
- char *buf= m_blobs_buffer + offset;
- uint32 len= 0xffffffff; // Max uint32
- DBUG_PRINT("value", ("read blob ptr=%lx len=%u",
- buf, (uint) blob_len));
- if (ndb_blob->readData(buf, len) != 0)
+ int isNull;
+ ndb_blob->getDefined(isNull);
+ if (isNull == 0) { // XXX -1 should be allowed only for events
+ Uint64 blob_len= 0;
+ if (ndb_blob->getLength(blob_len) != 0)
DBUG_RETURN(-1);
- DBUG_ASSERT(len == blob_len);
- field_blob->set_ptr(len, buf);
+ // Align to Uint64
+ uint32 blob_size= blob_len;
+ if (blob_size % 8 != 0)
+ blob_size+= 8 - blob_size % 8;
+ if (loop == 1)
+ {
+ char *buf= buffer + offset;
+ uint32 len= 0xffffffff; // Max uint32
+ DBUG_PRINT("info", ("read blob ptr=%p len=%u",
+ buf, (uint) blob_len));
+ if (ndb_blob->readData(buf, len) != 0)
+ DBUG_RETURN(-1);
+ DBUG_ASSERT(len == blob_len);
+ // Ugly hack assumes only ptr needs to be changed
+ field_blob->ptr += ptrdiff;
+ field_blob->set_ptr(len, buf);
+ field_blob->ptr -= ptrdiff;
+ }
+ offset+= blob_size;
}
- offset+= blob_size;
}
}
- if (loop == 0 && offset > m_blobs_buffer_size)
+ if (loop == 0 && offset > buffer_size)
{
- my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- m_blobs_buffer_size= 0;
- DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
- m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
- if (m_blobs_buffer == NULL)
+ my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
+ buffer_size= 0;
+ DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
+ buffer= my_malloc(offset, MYF(MY_WME));
+ if (buffer == NULL)
DBUG_RETURN(-1);
- m_blobs_buffer_size= offset;
+ buffer_size= offset;
}
}
DBUG_RETURN(0);
@@ -2713,14 +2735,22 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
else
{
NdbBlob *ndb_blob= (*value).blob;
- bool isNull= TRUE;
-#ifndef DBUG_OFF
- int ret=
-#endif
- ndb_blob->getNull(isNull);
- DBUG_ASSERT(ret == 0);
- if (isNull)
- field->set_null(row_offset);
+ int isNull;
+ ndb_blob->getDefined(isNull);
+ if (isNull != 0)
+ {
+ uint col_no = ndb_blob->getColumn()->getColumnNo();
+ if (isNull == 1)
+ {
+ DBUG_PRINT("info",("[%u] NULL", col_no))
+ field->set_null(row_offset);
+ }
+ else
+ {
+ DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
+ bitmap_clear_bit(defined, col_no);
+ }
+ }
}
}
}
@@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to)
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *orig_tab= (const NDBTAB *) m_table;
DBUG_ENTER("alter_table_name");
+ DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to));
NdbDictionary::Table new_tab= *orig_tab;
new_tab.setName(to);
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index 73b1b27ede2..a62356d41ab 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -25,6 +25,9 @@
#pragma interface /* gcc class implementation */
#endif
+/* Blob tables and events are internal to NDB and must never be accessed */
+#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
+
#include <NdbApi.hpp>
#include <ndbapi_limits.h>
@@ -78,6 +81,10 @@ typedef struct ndb_index_data {
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+ byte*& buffer, uint& buffer_size,
+ my_ptrdiff_t ptrdiff);
+
typedef enum {
NSS_INITIAL= 0,
NSS_DROPPED,
@@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share {
#ifdef HAVE_NDB_BINLOG
/* NDB_SHARE.flags */
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
+#define NSF_BLOB_FLAG 2 /* table has blob attributes */
#define NSF_NO_BINLOG 4 /* table should not be binlogged */
#endif
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index f1da21a3ad5..1b3833fe443 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -23,6 +23,11 @@
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
+#ifdef ndb_dynamite
+#undef assert
+#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
+#endif
+
/*
defines for cluster replication table names
*/
@@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
DBUG_ASSERT(_table != 0);
if (_table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK;
+ if (_table->s->blob_fields != 0)
+ share->flags|= NSF_BLOB_FLAG;
return;
}
while (1)
@@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
}
if (table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK;
+ if (table->s->blob_fields != 0)
+ share->flags|= NSF_BLOB_FLAG;
break;
}
}
@@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
NDB_SHARE *share)
{
DBUG_ENTER("ndbcluster_create_binlog_setup");
+ DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
pthread_mutex_lock(&ndbcluster_mutex);
@@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
const char *event_name, NDB_SHARE *share)
{
DBUG_ENTER("ndbcluster_create_event");
+ DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
+ ndbtab->getName(), ndbtab->getObjectVersion(),
+ event_name, share ? share->key : "(nil)"));
+ DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
if (!share)
{
DBUG_PRINT("info", ("share == NULL"));
@@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
my_event.addTableEvent(NDBEVENT::TE_ALL);
if (share->flags & NSF_HIDDEN_PK)
{
- /* No primary key, susbscribe for all attributes */
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ sql_print_error("NDB Binlog: logging of table %s "
+ "with no PK and blob attributes is not supported",
+ share->key);
+ DBUG_RETURN(-1);
+ }
+ /* No primary key, subscribe for all attributes */
my_event.setReport(NDBEVENT::ER_ALL);
DBUG_PRINT("info", ("subscription all"));
}
@@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
DBUG_PRINT("info", ("subscription all and subscribe"));
}
}
+ if (share->flags & NSF_BLOB_FLAG)
+ my_event.mergeEvents(true);
/* add all columns to the event */
int n_cols= ndbtab->getNoOfColumns();
@@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/
DBUG_ENTER("ndbcluster_create_event_ops");
+ DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
DBUG_ASSERT(share != 0);
@@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
}
TABLE *table= share->table;
- if (table)
- {
- /*
- Logging of blob tables is not yet implemented, it would require:
- 1. setup of events also on the blob attribute tables
- 2. collect the pieces of the blob into one from an epoch to
- provide a full blob to binlog
- */
- if (table->s->blob_fields)
- {
- sql_print_error("NDB Binlog: logging of blob table %s "
- "is not supported", share->key);
- share->flags|= NSF_NO_BINLOG;
- DBUG_RETURN(0);
- }
- }
int do_schema_share= 0, do_apply_status_share= 0;
int retries= 100;
@@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
DBUG_RETURN(-1);
}
+ if (share->flags & NSF_BLOB_FLAG)
+ op->mergeEvents(true); // currently not inherited from event
+
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ /*
+ * Given servers S1 S2, following results in out-of-date
+ * event->m_tableImpl and column->m_blobTable.
+ *
+ * S1: create table t1(a int primary key);
+ * S2: drop table t1;
+ * S1: create table t2(a int primary key, b blob);
+ * S1: alter table t2 add x int;
+ * S1: alter table t2 drop x;
+ *
+ * TODO fix at right place before we get here
+ */
+ ndb->getDictionary()->fix_blob_events(ndbtab, event_name);
+ }
+
int n_columns= ndbtab->getNoOfColumns();
- int n_fields= table ? table->s->fields : 0;
+ int n_fields= table ? table->s->fields : 0; // XXX ???
for (int j= 0; j < n_columns; j++)
{
const char *col_name= ndbtab->getColumn(j)->getName();
- NdbRecAttr *attr0, *attr1;
+ NdbValue attr0, attr1;
if (j < n_fields)
{
Field *f= share->table->field[j];
if (is_ndb_compatible_type(f))
{
DBUG_PRINT("info", ("%s compatible", col_name));
- attr0= op->getValue(col_name, f->ptr);
- attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
+ attr0.rec= op->getValue(col_name, f->ptr);
+ attr1.rec= op->getPreValue(col_name,
+ (f->ptr - share->table->record[0]) +
share->table->record[1]);
}
- else
+ else if (! (f->flags & BLOB_FLAG))
{
DBUG_PRINT("info", ("%s non compatible", col_name));
- attr0= op->getValue(col_name);
- attr1= op->getPreValue(col_name);
+ attr0.rec= op->getValue(col_name);
+ attr1.rec= op->getPreValue(col_name);
+ }
+ else
+ {
+ DBUG_PRINT("info", ("%s blob", col_name));
+ attr0.blob= op->getBlobHandle(col_name);
+ attr1.blob= op->getPreBlobHandle(col_name);
}
}
else
{
DBUG_PRINT("info", ("%s hidden key", col_name));
- attr0= op->getValue(col_name);
- attr1= op->getPreValue(col_name);
+ attr0.rec= op->getValue(col_name);
+ attr1.rec= op->getPreValue(col_name);
}
- share->ndb_value[0][j].rec= attr0;
- share->ndb_value[1][j].rec= attr1;
+ share->ndb_value[0][j].ptr= attr0.ptr;
+ share->ndb_value[1][j].ptr= attr1.ptr;
}
op->setCustomData((void *) share); // set before execute
share->op= op; // assign op in NDB_SHARE
@@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
(saves moving data about many times)
*/
+ /*
+ for now malloc/free blobs buffer each time
+ TODO if possible share single permanent buffer with handlers
+ */
+ byte* blobs_buffer[2] = { 0, 0 };
+ uint blobs_buffer_size[2] = { 0, 0 };
+
switch(pOp->getEventType())
{
case NDBEVENT::TE_INSERT:
row.n_inserts++;
DBUG_PRINT("info", ("INSERT INTO %s", share->key));
{
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ my_ptrdiff_t ptrdiff= 0;
+ int ret= get_ndb_blobs_value(table, share->ndb_value[0],
+ blobs_buffer[0], blobs_buffer_size[0],
+ ptrdiff);
+ DBUG_ASSERT(ret == 0);
+ }
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
trans.write_row(::server_id, injector::transaction::table(table, true),
&b, n_fields, table->record[0]);
@@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
key
*/
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
+ int ret= get_ndb_blobs_value(table, share->ndb_value[n],
+ blobs_buffer[n], blobs_buffer_size[n],
+ ptrdiff);
+ DBUG_ASSERT(ret == 0);
+ }
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
print_records(table, table->record[n]);
trans.delete_row(::server_id, injector::transaction::table(table, true),
@@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
row.n_updates++;
DBUG_PRINT("info", ("UPDATE %s", share->key));
{
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ my_ptrdiff_t ptrdiff= 0;
+ int ret= get_ndb_blobs_value(table, share->ndb_value[0],
+ blobs_buffer[0], blobs_buffer_size[0],
+ ptrdiff);
+ DBUG_ASSERT(ret == 0);
+ }
ndb_unpack_record(table, share->ndb_value[0],
&b, table->record[0]);
print_records(table, table->record[0]);
if (table->s->primary_key != MAX_KEY)
{
/*
- since table has a primary key, we can to a write
+ since table has a primary key, we can do a write
using only after values
*/
trans.write_row(::server_id, injector::transaction::table(table, true),
@@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
mysql server cannot handle the ndb hidden key and
therefore needs the before image as well
*/
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
+ int ret= get_ndb_blobs_value(table, share->ndb_value[1],
+ blobs_buffer[1], blobs_buffer_size[1],
+ ptrdiff);
+ DBUG_ASSERT(ret == 0);
+ }
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
print_records(table, table->record[1]);
trans.update_row(::server_id,
@@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
break;
}
+ if (share->flags & NSF_BLOB_FLAG)
+ {
+ my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
+ my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
+ }
+
return 0;
}
@@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Binlog_index_row row;
while (pOp != NULL)
{
+ // sometimes get TE_ALTER with invalid table
+ DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
+ ! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName()));
ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
@@ -2684,6 +2767,7 @@ err:
DBUG_PRINT("info",("removing all event operations"));
while ((op= ndb->getEventOperation()))
{
+ DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName()));
DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();