summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc2943
1 files changed, 2943 insertions, 0 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
new file mode 100644
index 00000000000..3bc322878d1
--- /dev/null
+++ b/sql/ha_ndbcluster.cc
@@ -0,0 +1,2943 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+/*
+ This file defines the NDB Cluster handler: the interface between MySQL and
+ NDB Cluster
+*/
+
+/*
+ TODO
+ After CREATE DATABASE gör discover på alla tabeller i den databasen
+
+*/
+
+
+#ifdef __GNUC__
+#pragma implementation // gcc: Class implementation
+#endif
+
+#include "mysql_priv.h"
+
+#ifdef HAVE_NDBCLUSTER_DB
+#include <my_dir.h>
+#include "ha_ndbcluster.h"
+#include <ndbapi/NdbApi.hpp>
+#include <ndbapi/NdbScanFilter.hpp>
+
+#define USE_DISCOVER_ON_STARTUP
+//#define USE_NDB_POOL
+
+// Default value for parallelism
+static const int parallelism= 240;
+
+#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
+
+#define ERR_PRINT(err) \
+ DBUG_PRINT("error", ("Error: %d message: %s", err.code, err.message))
+
+#define ERR_RETURN(err) \
+{ \
+ ERR_PRINT(err); \
+ DBUG_RETURN(ndb_to_mysql_error(&err)); \
+}
+
+// Typedefs for long names
+typedef NdbDictionary::Column NDBCOL;
+typedef NdbDictionary::Table NDBTAB;
+typedef NdbDictionary::Index NDBINDEX;
+typedef NdbDictionary::Dictionary NDBDICT;
+
+bool ndbcluster_inited= false;
+
+// Handler synchronization
+pthread_mutex_t ndbcluster_mutex;
+
+// Table lock handling
+static HASH ndbcluster_open_tables;
+
+static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
+ my_bool not_used __attribute__((unused)));
+static NDB_SHARE *get_share(const char *table_name);
+static void free_share(NDB_SHARE *share);
+
+static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
+static int unpackfrm(const void **data, uint *len,
+ const void* pack_data);
+
+/*
+ Error handling functions
+*/
+
+struct err_code_mapping
+{
+ int ndb_err;
+ int my_err;
+};
+
+static const err_code_mapping err_map[]=
+{
+ { 626, HA_ERR_KEY_NOT_FOUND },
+ { 630, HA_ERR_FOUND_DUPP_KEY },
+ { 893, HA_ERR_FOUND_DUPP_UNIQUE },
+ { 721, HA_ERR_TABLE_EXIST },
+ { 241, HA_ERR_OLD_METADATA },
+ { -1, -1 }
+};
+
+
+static int ndb_to_mysql_error(const NdbError *err)
+{
+ uint i;
+ for (i=0 ; err_map[i].ndb_err != err->code ; i++)
+ {
+ if (err_map[i].my_err == -1)
+ return err->code;
+ }
+ return err_map[i].my_err;
+}
+
+
+/*
+ Take care of the error that occured in NDB
+
+ RETURN
+ 0 No error
+ # The mapped error code
+*/
+
+int ha_ndbcluster::ndb_err(NdbConnection *trans)
+{
+ const NdbError err= trans->getNdbError();
+ if (!err.code)
+ return 0; // Don't log things to DBUG log if no error
+ DBUG_ENTER("ndb_err");
+
+ ERR_PRINT(err);
+ switch (err.classification) {
+ case NdbError::SchemaError:
+ {
+ NDBDICT *dict= m_ndb->getDictionary();
+ DBUG_PRINT("info", ("invalidateTable %s", m_tabname));
+ dict->invalidateTable(m_tabname);
+ break;
+ }
+ default:
+ break;
+ }
+ DBUG_RETURN(ndb_to_mysql_error(&err));
+}
+
+
+/*
+ Instruct NDB to set the value of the hidden primary key
+*/
+
+bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
+ uint fieldnr, const byte *field_ptr)
+{
+ DBUG_ENTER("set_hidden_key");
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr,
+ NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0);
+}
+
+
+/*
+ Instruct NDB to set the value of one primary key attribute
+*/
+
+int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
+ uint fieldnr, const byte *field_ptr)
+{
+ uint32 pack_len= field->pack_length();
+ DBUG_ENTER("set_ndb_key");
+ DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d",
+ fieldnr, field->field_name, field->type(),
+ pack_len));
+ DBUG_DUMP("key", (char*)field_ptr, pack_len);
+
+ switch (field->type()) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ default:
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+ DBUG_RETURN(3);
+}
+
+
+/*
+ Instruct NDB to set the value of one attribute
+*/
+
+int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
+ uint fieldnr)
+{
+ const byte* field_ptr= field->ptr;
+ uint32 pack_len= field->pack_length();
+ DBUG_ENTER("set_ndb_value");
+ DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
+ fieldnr, field->field_name, field->type(),
+ pack_len, field->is_null()?"Y":"N"));
+ DBUG_DUMP("value", (char*) field_ptr, pack_len);
+
+ if (field->is_null())
+ {
+ // Set value to NULL
+ DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
+ }
+
+ switch (field->type()) {
+ case MYSQL_TYPE_DECIMAL:
+ case MYSQL_TYPE_TINY:
+ case MYSQL_TYPE_SHORT:
+ case MYSQL_TYPE_LONG:
+ case MYSQL_TYPE_FLOAT:
+ case MYSQL_TYPE_DOUBLE:
+ case MYSQL_TYPE_TIMESTAMP:
+ case MYSQL_TYPE_LONGLONG:
+ case MYSQL_TYPE_INT24:
+ case MYSQL_TYPE_DATE:
+ case MYSQL_TYPE_TIME:
+ case MYSQL_TYPE_DATETIME:
+ case MYSQL_TYPE_YEAR:
+ case MYSQL_TYPE_NEWDATE:
+ case MYSQL_TYPE_ENUM:
+ case MYSQL_TYPE_SET:
+ case MYSQL_TYPE_VAR_STRING:
+ case MYSQL_TYPE_STRING:
+ // Common implementation for most field types
+ DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
+
+ case MYSQL_TYPE_TINY_BLOB:
+ case MYSQL_TYPE_MEDIUM_BLOB:
+ case MYSQL_TYPE_LONG_BLOB:
+ case MYSQL_TYPE_BLOB:
+ case MYSQL_TYPE_NULL:
+ case MYSQL_TYPE_GEOMETRY:
+ default:
+ // Unhandled field types
+ DBUG_PRINT("error", ("Field type %d not supported", field->type()));
+ DBUG_RETURN(2);
+ }
+ DBUG_RETURN(3);
+}
+
+
+/*
+ Instruct NDB to fetch one field
+ - data is read directly into buffer provided by field_ptr
+ if it's NULL, data is read into memory provided by NDBAPI
+*/
+
+int ha_ndbcluster::get_ndb_value(NdbOperation *op,
+ uint field_no, byte *field_ptr)
+{
+ DBUG_ENTER("get_ndb_value");
+ DBUG_PRINT("enter", ("field_no: %d", field_no));
+ m_value[field_no]= op->getValue(field_no, field_ptr);
+ DBUG_RETURN(m_value == NULL);
+}
+
+
+/*
+ Get metadata for this table from NDB
+
+ IMPLEMENTATION
+ - save the NdbDictionary::Table for easy access
+ - check that frm-file on disk is equal to frm-file
+ of table accessed in NDB
+ - build a list of the indexes for the table
+*/
+
+int ha_ndbcluster::get_metadata(const char *path)
+{
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *tab;
+ const void *data, *pack_data;
+ const char **key_name;
+ uint ndb_columns, mysql_columns, length, pack_length, i;
+ int error;
+ DBUG_ENTER("get_metadata");
+ DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
+
+ if (!(tab= dict->getTable(m_tabname)))
+ ERR_RETURN(dict->getNdbError());
+ DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
+
+ /*
+ This is the place to check that the table we got from NDB
+ is equal to the one on local disk
+ */
+ ndb_columns= (uint) tab->getNoOfColumns();
+ mysql_columns= table->fields;
+ if (table->primary_key == MAX_KEY)
+ ndb_columns--;
+ if (ndb_columns != mysql_columns)
+ {
+ DBUG_PRINT("error",
+ ("Wrong number of columns, ndb: %d mysql: %d",
+ ndb_columns, mysql_columns));
+ DBUG_RETURN(HA_ERR_OLD_METADATA);
+ }
+
+ /*
+ Compare FrmData in NDB with frm file from disk.
+ */
+ error= 0;
+ if (readfrm(path, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_RETURN(1);
+ }
+
+ if ((pack_length != tab->getFrmLength()) ||
+ (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ {
+ DBUG_PRINT("error",
+ ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+ pack_length, tab->getFrmLength(),
+ memcmp(pack_data, tab->getFrmData(), pack_length)));
+ DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
+ DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
+ error= HA_ERR_OLD_METADATA;
+ }
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+ if (error)
+ DBUG_RETURN(error);
+
+ // All checks OK, lets use the table
+ m_table= (void*)tab;
+
+ for (i= 0; i < MAX_KEY; i++)
+ m_indextype[i]= UNDEFINED_INDEX;
+
+ // Save information about all known indexes
+ for (i= 0; i < table->keys; i++)
+ m_indextype[i] = get_index_type_from_table(i);
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Decode the type of an index from information
+ provided in table object
+*/
+NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint index_no) const
+{
+ if (index_no == table->primary_key)
+ return PRIMARY_KEY_INDEX;
+ else
+ return ((table->key_info[index_no].flags & HA_NOSAME) ?
+ UNIQUE_INDEX :
+ ORDERED_INDEX);
+}
+
+
+void ha_ndbcluster::release_metadata()
+{
+ DBUG_ENTER("release_metadata");
+ DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
+
+ m_table= NULL;
+
+ DBUG_VOID_RETURN;
+}
+
+static const ulong index_type_flags[]=
+{
+ /* UNDEFINED_INDEX */
+ 0,
+
+ /* PRIMARY_KEY_INDEX */
+ HA_ONLY_WHOLE_INDEX |
+ HA_WRONG_ASCII_ORDER |
+ HA_NOT_READ_PREFIX_LAST,
+
+ /* UNIQUE_INDEX */
+ HA_ONLY_WHOLE_INDEX |
+ HA_WRONG_ASCII_ORDER |
+ HA_NOT_READ_PREFIX_LAST,
+
+ /* ORDERED_INDEX */
+ HA_READ_NEXT |
+ HA_READ_PREV |
+ HA_NOT_READ_AFTER_KEY
+};
+
+static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
+
+inline const char* ha_ndbcluster::get_index_name(uint idx_no) const
+{
+ return table->keynames.type_names[idx_no];
+}
+
+inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
+{
+ DBUG_ASSERT(idx_no < MAX_KEY);
+ return m_indextype[idx_no];
+}
+
+
+/*
+ Get the flags for an index
+
+ RETURN
+ flags depending on the type of the index.
+*/
+
+inline ulong ha_ndbcluster::index_flags(uint idx_no) const
+{
+ DBUG_ENTER("index_flags");
+ DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
+ DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
+}
+
+
+int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
+{
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+ DBUG_ENTER("set_primary_key");
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, key))
+ ERR_RETURN(op->getNdbError());
+ key += key_part->length;
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::set_primary_key(NdbOperation *op)
+{
+ DBUG_ENTER("set_primary_key");
+ KEY* key_info= table->key_info + table->primary_key;
+ KEY_PART_INFO* key_part= key_info->key_part;
+ KEY_PART_INFO* end= key_part+key_info->key_parts;
+
+ for (; key_part != end; key_part++)
+ {
+ Field* field= key_part->field;
+ if (set_ndb_key(op, field,
+ key_part->fieldnr-1, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Read one record from NDB using primary key
+*/
+
+int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
+{
+ uint no_fields= table->fields, i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ THD *thd= current_thd;
+ DBUG_ENTER("pk_read");
+ DBUG_PRINT("enter", ("key_len: %u", key_len));
+ DBUG_DUMP("key", (char*)key, key_len);
+
+ if (!(op= trans->getNdbOperation(m_tabname)) || op->readTuple() != 0)
+ goto err;
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ DBUG_DUMP("key", (char*)key, 8);
+ if (set_hidden_key(op, no_fields, key))
+ goto err;
+ // Read key at the same time, for future reference
+ if (get_ndb_value(op, no_fields, NULL))
+ goto err;
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op, key)))
+ return res;
+ }
+
+ // Read non-key field(s)
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if (thd->query_id == field->query_id)
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ goto err;
+ }
+ else
+ {
+ // Attribute was not to be read
+ m_value[i]= NULL;
+ }
+ }
+
+ if (trans->execute(NoCommit, IgnoreError) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
+
+ // The value have now been fetched from NDB
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+
+ err:
+ ERR_RETURN(trans->getNdbError());
+}
+
+
+/*
+ Read one record from NDB using unique secondary index
+*/
+
+int ha_ndbcluster::unique_index_read(const byte *key,
+ uint key_len, byte *buf)
+{
+ NdbConnection *trans= m_active_trans;
+ const char *index_name;
+ NdbIndexOperation *op;
+ THD *thd= current_thd;
+ byte *key_ptr;
+ KEY* key_info;
+ KEY_PART_INFO *key_part, *end;
+ uint i;
+ DBUG_ENTER("unique_index_read");
+ DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
+ DBUG_DUMP("key", (char*)key, key_len);
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbIndexOperation(index_name, m_tabname)) ||
+ op->readTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ // Set secondary index key(s)
+ key_ptr= (byte *) key;
+ key_info= table->key_info + active_index;
+ DBUG_ASSERT(key_info->key_length == key_len);
+ end= (key_part= key_info->key_part) + key_info->key_parts;
+
+ for (i= 0; key_part != end; key_part++, i++)
+ {
+ if (set_ndb_key(op, key_part->field, i, key_ptr))
+ ERR_RETURN(trans->getNdbError());
+ key_ptr+= key_part->length;
+ }
+
+ // Get non-index attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ // Attribute was not to be read
+ m_value[i]= NULL;
+ }
+ }
+
+ if (trans->execute(NoCommit, IgnoreError) != 0)
+ {
+ table->status= STATUS_NOT_FOUND;
+ DBUG_RETURN(ndb_err(trans));
+ }
+ // The value have now been fetched from NDB
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+}
+
+/*
+ Get the next record of a started scan
+*/
+
+inline int ha_ndbcluster::next_result(byte *buf)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("next_result");
+
+ if (cursor->nextResult() == 0)
+ {
+ // One more record found
+ unpack_record(buf);
+ table->status= 0;
+ DBUG_RETURN(0);
+ }
+ table->status= STATUS_NOT_FOUND;
+ if (ndb_err(trans))
+ ERR_RETURN(trans->getNdbError());
+
+ // No more records
+ DBUG_PRINT("info", ("No more records"));
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+}
+
+
+/*
+ Read record(s) from NDB using ordered index scan
+*/
+
+int ha_ndbcluster::ordered_index_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag)
+{
+ uint no_fields= table->fields;
+ uint tot_len, i;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+ NdbScanOperation *op;
+ const char *bound_str= NULL;
+ const char *index_name;
+ NdbOperation::BoundType bound_type = NdbOperation::BoundEQ;
+ bool can_be_handled_by_ndb= FALSE;
+ byte *key_ptr;
+ KEY *key_info;
+ THD* thd = current_thd;
+ DBUG_ENTER("ordered_index_scan");
+ DBUG_PRINT("enter", ("index: %u", active_index));
+ DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
+
+ index_name= get_index_name(active_index);
+ if (!(op= trans->getNdbScanOperation(index_name, m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ switch (find_flag) {
+ case HA_READ_KEY_EXACT: /* Find first record else error */
+ bound_str= "HA_READ_KEY_EXACT";
+ bound_type= NdbOperation::BoundEQ;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_KEY_OR_NEXT: /* Record or next record */
+ bound_str= "HA_READ_KEY_OR_NEXT";
+ bound_type= NdbOperation::BoundLE;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_KEY_OR_PREV: /* Record or previous */
+ bound_str= "HA_READ_KEY_OR_PREV";
+ bound_type= NdbOperation::BoundGE;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
+ bound_str= "HA_READ_AFTER_KEY";
+ bound_type= NdbOperation::BoundLT;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
+ bound_str= "HA_READ_BEFORE_KEY";
+ bound_type= NdbOperation::BoundGT;
+ can_be_handled_by_ndb= TRUE;
+ break;
+ case HA_READ_PREFIX: /* Key which as same prefix */
+ bound_str= "HA_READ_PREFIX";
+ break;
+ case HA_READ_PREFIX_LAST: /* Last key with the same prefix */
+ bound_str= "HA_READ_PREFIX_LAST";
+ break;
+ case HA_READ_PREFIX_LAST_OR_PREV:
+ /* Last or prev key with the same prefix */
+ bound_str= "HA_READ_PREFIX_LAST_OR_PREV";
+ break;
+ default:
+ bound_str= "UNKNOWN";
+ break;
+ }
+ DBUG_PRINT("info", ("find_flag: %s, bound_type: %d,"
+ "can_be_handled_by_ndb: %d",
+ bound_str, bound_type, can_be_handled_by_ndb));
+ if (!can_be_handled_by_ndb)
+ DBUG_RETURN(1);
+
+ // Set bounds using key data
+ tot_len= 0;
+ key_ptr= (byte *) key;
+ key_info= table->key_info + active_index;
+ for (i= 0; i < key_info->key_parts; i++)
+ {
+ Field* field= key_info->key_part[i].field;
+ uint32 field_len= field->pack_length();
+ DBUG_PRINT("info", ("Set index bound on %s",
+ field->field_name));
+ DBUG_DUMP("key", (char*)key_ptr, field_len);
+
+ if (op->setBound(field->field_name,
+ bound_type,
+ key_ptr,
+ field_len) != 0)
+ ERR_RETURN(op->getNdbError());
+
+ key_ptr+= field_len;
+ tot_len+= field_len;
+ if (tot_len >= key_len)
+ break;
+ }
+
+ // Define attributes to read
+ for (i= 0; i < no_fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i]= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= no_fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+
+#if 0
+/*
+ Read record(s) from NDB using full table scan with filter
+ */
+
+int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
+ byte *buf,
+ enum ha_rkey_function find_flag)
+{
+ uint no_fields= table->fields;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor= m_active_cursor;
+
+ DBUG_ENTER("filtered_scan");
+ DBUG_PRINT("enter", ("key_len: %u, index: %u",
+ key_len, active_index));
+ DBUG_DUMP("key", (char*)key, key_len);
+ DBUG_PRINT("info", ("Starting a new filtered scan on %s",
+ m_tabname));
+ NdbScanOperation *op= trans->getNdbScanOperation(m_tabname);
+ if (!op)
+ ERR_RETURN(trans->getNdbError());
+
+ cursor= op->readTuples(parallelism);
+ if (!cursor)
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ {
+ // Start scan filter
+ NdbScanFilter sf(op);
+ sf.begin();
+
+ // Set filter using the supplied key data
+ byte *key_ptr= (byte *) key;
+ uint tot_len= 0;
+ KEY* key_info= table->key_info + active_index;
+ for (uint k= 0; k < key_info->key_parts; k++)
+ {
+ KEY_PART_INFO* key_part= key_info->key_part+k;
+ Field* field= key_part->field;
+ uint ndb_fieldnr= key_part->fieldnr-1;
+ DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
+ // const NDBCOL *col= tab->getColumn(ndb_fieldnr);
+ uint32 field_len= field->pack_length();
+ DBUG_DUMP("key", (char*)key, field_len);
+
+ DBUG_PRINT("info", ("Column %s, type: %d, len: %d",
+ field->field_name, field->real_type(), field_len));
+
+ // Define scan filter
+ if (field->real_type() == MYSQL_TYPE_STRING)
+ sf.eq(ndb_fieldnr, key_ptr, field_len);
+ else
+ {
+ if (field_len == 8)
+ sf.eq(ndb_fieldnr, (Uint64)*key_ptr);
+ else if (field_len <= 4)
+ sf.eq(ndb_fieldnr, (Uint32)*key_ptr);
+ else
+ DBUG_RETURN(1);
+ }
+
+ key_ptr += field_len;
+ tot_len += field_len;
+
+ if (tot_len >= key_len)
+ break;
+ }
+ // End scan filter
+ sf.end();
+ }
+
+ // Define attributes to read
+ for (uint field_no= 0; field_no < no_fields; field_no++)
+ {
+ Field *field= table->field[field_no];
+
+ // Read attribute
+ DBUG_PRINT("get", ("%d: %s", field_no, field->field_name));
+ if (get_ndb_value(op, field_no, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= no_fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+#endif
+
+
+/*
+ Read records from NDB using full table scan
+ */
+
+int ha_ndbcluster::full_table_scan(byte *buf)
+{
+ uint i;
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+ NdbResultSet *cursor;
+ NdbScanOperation *op;
+
+ DBUG_ENTER("full_table_scan");
+ DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
+
+ if (!(op=trans->getNdbScanOperation(m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+ if (!(cursor= op->readTuples(parallelism)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+
+ // Define attributes to read
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) ||
+ (field->flags & PRI_KEY_FLAG))
+ {
+ if (get_ndb_value(op, i, field->ptr))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ m_value[i]= NULL;
+ }
+ }
+
+ if (table->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Getting hidden key"));
+ // Scanning table with no primary key
+ int hidden_no= table->fields;
+#ifndef DBUG_OFF
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ if (!tab->getColumn(hidden_no))
+ DBUG_RETURN(1);
+#endif
+ if (get_ndb_value(op, hidden_no, NULL))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_PRINT("exit", ("Scan started successfully"));
+ DBUG_RETURN(next_result(buf));
+}
+
+
+/*
+ Insert one record into NDB
+*/
+
+int ha_ndbcluster::write_row(byte *record)
+{
+ uint i;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ int res;
+ DBUG_ENTER("write_row");
+
+ statistic_increment(ha_write_count,&LOCK_status);
+ if (table->timestamp_default_now)
+ update_timestamp(record+table->timestamp_default_now-1);
+ if (table->next_number_field && record == table->record[0])
+ update_auto_increment();
+
+ if (!(op= trans->getNdbOperation(m_tabname)))
+ ERR_RETURN(trans->getNdbError());
+
+ res= (m_use_write) ? op->writeTuple() :op->insertTuple();
+ if (res != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // Table has hidden primary key
+ Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
+
+ // Set non-key attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+ Field *field= table->field[i];
+ if (!(field->flags & PRI_KEY_FLAG) &&
+ set_ndb_value(op, field, i))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ /*
+ Execute write operation
+ NOTE When doing inserts with many values in
+ each INSERT statement it should not be necessary
+ to NoCommit the transaction between each row.
+ Find out how this is detected!
+ */
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_RETURN(0);
+}
+
+
+/* Compare if a key in a row has changed */
+
+int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
+ const byte * new_row)
+{
+ KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
+ KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;
+
+ for (; key_part != end ; key_part++)
+ {
+ if (key_part->null_bit)
+ {
+ if ((old_row[key_part->null_offset] & key_part->null_bit) !=
+ (new_row[key_part->null_offset] & key_part->null_bit))
+ return 1;
+ }
+ if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH))
+ {
+
+ if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
+ (char*) (new_row + key_part->offset),
+ (ulong) key_part->length))
+ return 1;
+ }
+ else
+ {
+ if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
+ key_part->length))
+ return 1;
+ }
+ }
+ return 0;
+}
+
+/*
+ Update one record in NDB using primary key
+*/
+
+int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
+{
+ THD *thd= current_thd;
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ uint i;
+ DBUG_ENTER("update_row");
+
+ statistic_increment(ha_update_count,&LOCK_status);
+ if (table->timestamp_on_update_now)
+ update_timestamp(new_data+table->timestamp_on_update_now-1);
+
+ if (!(op= trans->getNdbOperation(m_tabname)) ||
+ op->updateTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+
+ // Require that the PK for this record has previously been
+ // read into m_value
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec);
+ DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ /* Check for update of primary key and return error */
+ if (key_cmp(table->primary_key, old_data, new_data))
+ DBUG_RETURN(HA_ERR_UNSUPPORTED);
+
+ int res;
+ if ((res= set_primary_key(op, old_data + table->null_bytes)))
+ DBUG_RETURN(res);
+ }
+
+ // Set non-key attribute(s)
+ for (i= 0; i < table->fields; i++)
+ {
+
+ Field *field= table->field[i];
+ if ((thd->query_id == field->query_id) &&
+ (!(field->flags & PRI_KEY_FLAG)) &&
+ set_ndb_value(op, field, i))
+ ERR_RETURN(op->getNdbError());
+ }
+
+ // Execute update operation
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Delete one record from NDB, using primary key
+*/
+
+int ha_ndbcluster::delete_row(const byte *record)
+{
+ NdbConnection *trans= m_active_trans;
+ NdbOperation *op;
+ DBUG_ENTER("delete_row");
+
+ statistic_increment(ha_delete_count,&LOCK_status);
+
+ if (!(op=trans->getNdbOperation(m_tabname)) ||
+ op->deleteTuple() != 0)
+ ERR_RETURN(trans->getNdbError());
+
+ if (table->primary_key == MAX_KEY)
+ {
+ // This table has no primary key, use "hidden" primary key
+ DBUG_PRINT("info", ("Using hidden key"));
+ uint no_fields= table->fields;
+ NdbRecAttr* rec= m_value[no_fields];
+ DBUG_ASSERT(rec != NULL);
+
+ if (set_hidden_key(op, no_fields, rec->aRef()))
+ ERR_RETURN(op->getNdbError());
+ }
+ else
+ {
+ int res;
+ if ((res= set_primary_key(op)))
+ return res;
+ }
+
+ // Execute delete operation
+ if (trans->execute(NoCommit) != 0)
+ DBUG_RETURN(ndb_err(trans));
+ DBUG_RETURN(0);
+}
+
+/*
+ Unpack a record read from NDB
+
+ SYNOPSIS
+ unpack_record()
+ buf Buffer to store read row
+
+ NOTE
+ The data for each row is read directly into the
+ destination buffer. This function is primarily
+ called in order to check if any fields should be
+ set to null.
+*/
+
+void ha_ndbcluster::unpack_record(byte* buf)
+{
+ uint row_offset= (uint) (buf - table->record[0]);
+ Field **field, **end;
+ NdbRecAttr **value= m_value;
+ DBUG_ENTER("unpack_record");
+
+ // Set null flag(s)
+ bzero(buf, table->null_bytes);
+ for (field= table->field, end= field+table->fields;
+ field < end;
+ field++, value++)
+ {
+ if (*value && (*value)->isNULL())
+ (*field)->set_null(row_offset);
+ }
+
+#ifndef DBUG_OFF
+ // Read and print all values that was fetched
+ if (table->primary_key == MAX_KEY)
+ {
+ // Table with hidden primary key
+ int hidden_no= table->fields;
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBCOL *hidden_col= tab->getColumn(hidden_no);
+ NdbRecAttr* rec= m_value[hidden_no];
+ DBUG_ASSERT(rec);
+ DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
+ hidden_col->getName(), rec->u_64_value()));
+ }
+ print_results();
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Utility function to print/dump the fetched field
+ */
+
+void ha_ndbcluster::print_results()
+{
+ const NDBTAB *tab= (NDBTAB*) m_table;
+ DBUG_ENTER("print_results");
+
+#ifndef DBUG_OFF
+ if (!_db_on_)
+ DBUG_VOID_RETURN;
+
+ for (uint f=0; f<table->fields;f++)
+ {
+ Field *field;
+ const NDBCOL *col;
+ NdbRecAttr *value;
+
+ if (!(value= m_value[f]))
+ {
+ fprintf(DBUG_FILE, "Field %d was not read\n", f);
+ continue;
+ }
+ field= table->field[f];
+ DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
+ col= tab->getColumn(f);
+ fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
+
+ if (value->isNULL())
+ {
+ fprintf(DBUG_FILE, "NULL\n");
+ continue;
+ }
+
+ switch (col->getType()) {
+ case NdbDictionary::Column::Blob:
+ case NdbDictionary::Column::Undefined:
+ fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
+ break;
+ case NdbDictionary::Column::Tinyint: {
+ char value= *field->ptr;
+ fprintf(DBUG_FILE, "Tinyint\t%d", value);
+ break;
+ }
+ case NdbDictionary::Column::Tinyunsigned: {
+ unsigned char value= *field->ptr;
+ fprintf(DBUG_FILE, "Tinyunsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Smallint: {
+ short value= *field->ptr;
+ fprintf(DBUG_FILE, "Smallint\t%d", value);
+ break;
+ }
+ case NdbDictionary::Column::Smallunsigned: {
+ unsigned short value= *field->ptr;
+ fprintf(DBUG_FILE, "Smallunsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Mediumint: {
+ byte value[3];
+ memcpy(value, field->ptr, 3);
+ fprintf(DBUG_FILE, "Mediumint\t%d,%d,%d", value[0], value[1], value[2]);
+ break;
+ }
+ case NdbDictionary::Column::Mediumunsigned: {
+ byte value[3];
+ memcpy(value, field->ptr, 3);
+ fprintf(DBUG_FILE, "Mediumunsigned\t%u,%u,%u", value[0], value[1], value[2]);
+ break;
+ }
+ case NdbDictionary::Column::Int: {
+ fprintf(DBUG_FILE, "Int\t%lld", field->val_int());
+ break;
+ }
+ case NdbDictionary::Column::Unsigned: {
+ Uint32 value= (Uint32) *field->ptr;
+ fprintf(DBUG_FILE, "Unsigned\t%u", value);
+ break;
+ }
+ case NdbDictionary::Column::Bigint: {
+ Int64 value= (Int64) *field->ptr;
+ fprintf(DBUG_FILE, "Bigint\t%lld", value);
+ break;
+ }
+ case NdbDictionary::Column::Bigunsigned: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Bigunsigned\t%llu", value);
+ break;
+ }
+ case NdbDictionary::Column::Float: {
+ float value= (float) *field->ptr;
+ fprintf(DBUG_FILE, "Float\t%f", value);
+ break;
+ }
+ case NdbDictionary::Column::Double: {
+ double value= (double) *field->ptr;
+ fprintf(DBUG_FILE, "Double\t%f", value);
+ break;
+ }
+ case NdbDictionary::Column::Decimal: {
+ char *value= field->ptr;
+
+ fprintf(DBUG_FILE, "Decimal\t'%-*s'", field->pack_length(), value);
+ break;
+ }
+ case NdbDictionary::Column::Char:{
+ char buf[field->pack_length()+1];
+ char *value= (char *) field->ptr;
+ snprintf(buf, field->pack_length(), "%s", value);
+ fprintf(DBUG_FILE, "Char\t'%s'", buf);
+ break;
+ }
+ case NdbDictionary::Column::Varchar:
+ case NdbDictionary::Column::Binary:
+ case NdbDictionary::Column::Varbinary: {
+ char *value= (char *) field->ptr;
+ fprintf(DBUG_FILE, "'%s'", value);
+ break;
+ }
+ case NdbDictionary::Column::Datetime: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Datetime\t%llu", value);
+ break;
+ }
+ case NdbDictionary::Column::Timespec: {
+ Uint64 value= (Uint64) *field->ptr;
+ fprintf(DBUG_FILE, "Timespec\t%llu", value);
+ break;
+ }
+ }
+ fprintf(DBUG_FILE, "\n");
+
+ }
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+int ha_ndbcluster::index_init(uint index)
+{
+ DBUG_ENTER("index_init");
+ DBUG_PRINT("enter", ("index: %u", index));
+ DBUG_RETURN(handler::index_init(index));
+}
+
+
+int ha_ndbcluster::index_end()
+{
+ DBUG_ENTER("index_end");
+ DBUG_RETURN(rnd_end());
+}
+
+
+int ha_ndbcluster::index_read(byte *buf,
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ DBUG_ENTER("index_read");
+ DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
+ active_index, key_len, find_flag));
+
+ int error= 1;
+ statistic_increment(ha_read_key_count, &LOCK_status);
+
+ switch (get_index_type(active_index)){
+ case PRIMARY_KEY_INDEX:
+ error= pk_read(key, key_len, buf);
+ break;
+
+ case UNIQUE_INDEX:
+ error= unique_index_read(key, key_len, buf);
+ break;
+
+ case ORDERED_INDEX:
+ error= ordered_index_scan(key, key_len, buf, find_flag);
+ break;
+
+ default:
+ case UNDEFINED_INDEX:
+ break;
+ }
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
+ const byte *key, uint key_len,
+ enum ha_rkey_function find_flag)
+{
+ statistic_increment(ha_read_key_count,&LOCK_status);
+ DBUG_ENTER("index_read_idx");
+ DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));
+ index_init(index_no);
+ DBUG_RETURN(index_read(buf, key, key_len, find_flag));
+}
+
+
+int ha_ndbcluster::index_next(byte *buf)
+{
+ DBUG_ENTER("index_next");
+
+ int error = 1;
+ statistic_increment(ha_read_next_count,&LOCK_status);
+ if (get_index_type(active_index) == PRIMARY_KEY_INDEX)
+ error= HA_ERR_END_OF_FILE;
+ else
+ error = next_result(buf);
+ DBUG_RETURN(error);
+}
+
+
+int ha_ndbcluster::index_prev(byte *buf)
+{
+ DBUG_ENTER("index_prev");
+ statistic_increment(ha_read_prev_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::index_first(byte *buf)
+{
+ DBUG_ENTER("index_first");
+ statistic_increment(ha_read_first_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::index_last(byte *buf)
+{
+ DBUG_ENTER("index_last");
+ statistic_increment(ha_read_last_count,&LOCK_status);
+ DBUG_RETURN(1);
+}
+
+
+int ha_ndbcluster::rnd_init(bool scan)
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("rnd_init");
+ DBUG_PRINT("enter", ("scan: %d", scan));
+ // Check that cursor is not defined
+ if (cursor)
+ DBUG_RETURN(1);
+ index_init(table->primary_key);
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::rnd_end()
+{
+ NdbResultSet *cursor= m_active_cursor;
+ DBUG_ENTER("rnd_end");
+
+ if (cursor)
+ {
+ DBUG_PRINT("info", ("Closing the cursor"));
+ cursor->close();
+ m_active_cursor= NULL;
+ }
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::rnd_next(byte *buf)
+{
+ DBUG_ENTER("rnd_next");
+ statistic_increment(ha_read_rnd_next_count, &LOCK_status);
+ int error = 1;
+ if (!m_active_cursor)
+ error = full_table_scan(buf);
+ else
+ error = next_result(buf);
+ DBUG_RETURN(error);
+}
+
+
+/*
+ An "interesting" record has been found and it's pk
+ retrieved by calling position
+ Now it's time to read the record from db once
+ again
+*/
+
+int ha_ndbcluster::rnd_pos(byte *buf, byte *pos)
+{
+ DBUG_ENTER("rnd_pos");
+ statistic_increment(ha_read_rnd_count,&LOCK_status);
+ // The primary key for the record is stored in pos
+ // Perform a pk_read using primary key "index"
+ DBUG_RETURN(pk_read(pos, ref_length, buf));
+}
+
+
+/*
+ Store the primary key of this record in ref
+ variable, so that the row can be retrieved again later
+ using "reference" in rnd_pos
+*/
+
+void ha_ndbcluster::position(const byte *record)
+{
+ KEY *key_info;
+ KEY_PART_INFO *key_part;
+ KEY_PART_INFO *end;
+ byte *buff;
+ DBUG_ENTER("position");
+
+ if (table->primary_key != MAX_KEY)
+ {
+ key_info= table->key_info + table->primary_key;
+ key_part= key_info->key_part;
+ end= key_part + key_info->key_parts;
+ buff= ref;
+
+ for (; key_part != end; key_part++)
+ {
+ if (key_part->null_bit) {
+ /* Store 0 if the key part is a NULL part */
+ if (record[key_part->null_offset]
+ & key_part->null_bit) {
+ *buff++= 1;
+ continue;
+ }
+ *buff++= 0;
+ }
+ memcpy(buff, record + key_part->offset, key_part->length);
+ buff += key_part->length;
+ }
+ }
+ else
+ {
+ // No primary key, get hidden key
+ DBUG_PRINT("info", ("Getting hidden key"));
+ int hidden_no= table->fields;
+ NdbRecAttr* rec= m_value[hidden_no];
+ const NDBTAB *tab= (NDBTAB *) m_table;
+ const NDBCOL *hidden_col= tab->getColumn(hidden_no);
+ DBUG_ASSERT(hidden_col->getPrimaryKey() &&
+ hidden_col->getAutoIncrement() &&
+ rec != NULL &&
+ ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+ memcpy(ref, (const void*)rec->aRef(), ref_length);
+ }
+
+ DBUG_DUMP("ref", (char*)ref, ref_length);
+ DBUG_VOID_RETURN;
+}
+
+
+void ha_ndbcluster::info(uint flag)
+{
+ DBUG_ENTER("info");
+ DBUG_PRINT("enter", ("flag: %d", flag));
+
+ if (flag & HA_STATUS_POS)
+ DBUG_PRINT("info", ("HA_STATUS_POS"));
+ if (flag & HA_STATUS_NO_LOCK)
+ DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
+ if (flag & HA_STATUS_TIME)
+ DBUG_PRINT("info", ("HA_STATUS_TIME"));
+ if (flag & HA_STATUS_CONST)
+ DBUG_PRINT("info", ("HA_STATUS_CONST"));
+ if (flag & HA_STATUS_VARIABLE)
+ DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
+ if (flag & HA_STATUS_ERRKEY)
+ DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
+ if (flag & HA_STATUS_AUTO)
+ DBUG_PRINT("info", ("HA_STATUS_AUTO"));
+ DBUG_VOID_RETURN;
+}
+
+
+int ha_ndbcluster::extra(enum ha_extra_function operation)
+{
+ DBUG_ENTER("extra");
+ switch (operation) {
+ case HA_EXTRA_NORMAL: /* Optimize for space (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NORMAL"));
+ break;
+ case HA_EXTRA_QUICK: /* Optimize for speed */
+ DBUG_PRINT("info", ("HA_EXTRA_QUICK"));
+ break;
+ case HA_EXTRA_RESET: /* Reset database to after open */
+ DBUG_PRINT("info", ("HA_EXTRA_RESET"));
+ break;
+ case HA_EXTRA_CACHE: /* Cash record in HA_rrnd() */
+ DBUG_PRINT("info", ("HA_EXTRA_CACHE"));
+ break;
+ case HA_EXTRA_NO_CACHE: /* End cacheing of records (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE"));
+ break;
+ case HA_EXTRA_NO_READCHECK: /* No readcheck on update */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK"));
+ break;
+ case HA_EXTRA_READCHECK: /* Use readcheck (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_READCHECK"));
+ break;
+ case HA_EXTRA_KEYREAD: /* Read only key to database */
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD"));
+ break;
+ case HA_EXTRA_NO_KEYREAD: /* Normal read of records (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD"));
+ break;
+ case HA_EXTRA_NO_USER_CHANGE: /* No user is allowed to write */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE"));
+ break;
+ case HA_EXTRA_KEY_CACHE:
+ DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE"));
+ break;
+ case HA_EXTRA_NO_KEY_CACHE:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE"));
+ break;
+ case HA_EXTRA_WAIT_LOCK: /* Wait until file is avalably (def) */
+ DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK"));
+ break;
+ case HA_EXTRA_NO_WAIT_LOCK: /* If file is locked, return quickly */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK"));
+ break;
+ case HA_EXTRA_WRITE_CACHE: /* Use write cache in ha_write() */
+ DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE"));
+ break;
+ case HA_EXTRA_FLUSH_CACHE: /* flush write_record_cache */
+ DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE"));
+ break;
+ case HA_EXTRA_NO_KEYS: /* Remove all update of keys */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS"));
+ break;
+ case HA_EXTRA_KEYREAD_CHANGE_POS: /* Keyread, but change pos */
+ DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */
+ break;
+ case HA_EXTRA_REMEMBER_POS: /* Remember pos for next/prev */
+ DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS"));
+ break;
+ case HA_EXTRA_RESTORE_POS:
+ DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS"));
+ break;
+ case HA_EXTRA_REINIT_CACHE: /* init cache from current record */
+ DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE"));
+ break;
+ case HA_EXTRA_FORCE_REOPEN: /* Datafile have changed on disk */
+ DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN"));
+ break;
+ case HA_EXTRA_FLUSH: /* Flush tables to disk */
+ DBUG_PRINT("info", ("HA_EXTRA_FLUSH"));
+ break;
+ case HA_EXTRA_NO_ROWS: /* Don't write rows */
+ DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS"));
+ break;
+ case HA_EXTRA_RESET_STATE: /* Reset positions */
+ DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE"));
+ break;
+ case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
+ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
+
+ DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
+ m_use_write= TRUE;
+ break;
+ case HA_EXTRA_NO_IGNORE_DUP_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
+ DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
+ m_use_write= false;
+ break;
+ case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
+ where field->query_id is the same as
+ the current query id */
+ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
+ break;
+ case HA_EXTRA_PREPARE_FOR_DELETE:
+ DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
+ break;
+ case HA_EXTRA_PREPARE_FOR_UPDATE: /* Remove read cache if problems */
+ DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE"));
+ break;
+ case HA_EXTRA_PRELOAD_BUFFER_SIZE:
+ DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE"));
+ break;
+ case HA_EXTRA_RETRIEVE_PRIMARY_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
+ break;
+ case HA_EXTRA_CHANGE_KEY_TO_UNIQUE:
+ DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
+ break;
+ case HA_EXTRA_CHANGE_KEY_TO_DUP:
+ DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
+ break;
+
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
+{
+ DBUG_ENTER("extra_opt");
+ DBUG_PRINT("enter", ("cache_size: %d", cache_size));
+ DBUG_RETURN(extra(operation));
+}
+
+
+int ha_ndbcluster::reset()
+{
+ DBUG_ENTER("reset");
+ // Reset what?
+ DBUG_RETURN(1);
+}
+
+
+const char **ha_ndbcluster::bas_ext() const
+{ static const char *ext[1] = { NullS }; return ext; }
+
+
+/*
+ How many seeks it will take to read through the table
+ This is to be comparable to the number returned by records_in_range so
+ that we can decide if we should scan the table or use keys.
+*/
+
+double ha_ndbcluster::scan_time()
+{
+ return rows2double(records/3);
+}
+
+
+THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
+ THR_LOCK_DATA **to,
+ enum thr_lock_type lock_type)
+{
+ DBUG_ENTER("store_lock");
+
+ if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK)
+ {
+
+ /* If we are not doing a LOCK TABLE, then allow multiple
+ writers */
+
+ if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
+ lock_type <= TL_WRITE) && !thd->in_lock_tables)
+ lock_type= TL_WRITE_ALLOW_WRITE;
+
+ /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
+ MySQL would use the lock TL_READ_NO_INSERT on t2, and that
+ would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
+ to t2. Convert the lock to a normal read lock to allow
+ concurrent inserts to t2. */
+
+ if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
+ lock_type= TL_READ;
+
+ m_lock.type=lock_type;
+ }
+ *to++= &m_lock;
+
+ DBUG_RETURN(to);
+}
+
+#ifndef DBUG_OFF
+#define PRINT_OPTION_FLAGS(t) { \
+ if (t->options & OPTION_NOT_AUTOCOMMIT) \
+ DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); \
+ if (t->options & OPTION_BEGIN) \
+ DBUG_PRINT("thd->options", ("OPTION_BEGIN")); \
+ if (t->options & OPTION_TABLE_LOCK) \
+ DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); \
+}
+#else
+#define PRINT_OPTION_FLAGS(t)
+#endif
+
+
+/*
+ As MySQL will execute an external lock for every new table it uses
+ we can use this to start the transactions.
+ If we are in auto_commit mode we just need to start a transaction
+ for the statement, this will be stored in transaction.stmt.
+ If not, we have to start a master transaction if there doesn't exist
+ one from before, this will be stored in transaction.all
+
+ When a table lock is held one transaction will be started which holds
+ the table lock and for each statement a hupp transaction will be started
+ */
+
+int ha_ndbcluster::external_lock(THD *thd, int lock_type)
+{
+ int error=0;
+ NdbConnection* trans= NULL;
+
+ DBUG_ENTER("external_lock");
+ DBUG_PRINT("enter", ("transaction.ndb_lock_count: %d",
+ thd->transaction.ndb_lock_count));
+
+ /*
+ Check that this handler instance has a connection
+ set up to the Ndb object of thd
+ */
+ if (check_ndb_connection())
+ DBUG_RETURN(1);
+
+ if (lock_type != F_UNLCK)
+ {
+ if (!thd->transaction.ndb_lock_count++)
+ {
+ PRINT_OPTION_FLAGS(thd);
+
+ if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK)))
+ {
+ // Autocommit transaction
+ DBUG_ASSERT(!thd->transaction.stmt.ndb_tid);
+ DBUG_PRINT("trans",("Starting transaction stmt"));
+
+ trans= m_ndb->startTransaction();
+ if (trans == NULL)
+ {
+ thd->transaction.ndb_lock_count--; // We didn't get the lock
+ ERR_RETURN(m_ndb->getNdbError());
+ }
+ thd->transaction.stmt.ndb_tid= trans;
+ }
+ else
+ {
+ if (!thd->transaction.all.ndb_tid)
+ {
+ // Not autocommit transaction
+ // A "master" transaction ha not been started yet
+ DBUG_PRINT("trans",("starting transaction, all"));
+
+ trans= m_ndb->startTransaction();
+ if (trans == NULL)
+ {
+ thd->transaction.ndb_lock_count--; // We didn't get the lock
+ ERR_RETURN(m_ndb->getNdbError());
+ }
+
+ /*
+ If this is the start of a LOCK TABLE, a table look
+ should be taken on the table in NDB
+
+ Check if it should be read or write lock
+ */
+ if (thd->options & (OPTION_TABLE_LOCK))
+ {
+ //lockThisTable();
+ DBUG_PRINT("info", ("Locking the table..." ));
+ }
+
+ thd->transaction.all.ndb_tid= trans;
+ }
+ }
+ }
+ /*
+ This is the place to make sure this handler instance
+ has a started transaction.
+
+ The transaction is started by the first handler on which
+ MySQL Server calls external lock
+
+ Other handlers in the same stmt or transaction should use
+ the same NDB transaction. This is done by setting up the m_active_trans
+ pointer to point to the NDB transaction.
+ */
+
+ m_active_trans= thd->transaction.all.ndb_tid ?
+ (NdbConnection*)thd->transaction.all.ndb_tid:
+ (NdbConnection*)thd->transaction.stmt.ndb_tid;
+ DBUG_ASSERT(m_active_trans);
+
+ }
+ else
+ {
+ if (!--thd->transaction.ndb_lock_count)
+ {
+ DBUG_PRINT("trans", ("Last external_lock"));
+ PRINT_OPTION_FLAGS(thd);
+
+ if (thd->transaction.stmt.ndb_tid)
+ {
+ /*
+ Unlock is done without a transaction commit / rollback.
+ This happens if the thread didn't update any rows
+ We must in this case close the transaction to release resources
+ */
+ DBUG_PRINT("trans",("ending non-updating transaction"));
+ m_ndb->closeTransaction(m_active_trans);
+ thd->transaction.stmt.ndb_tid= 0;
+ }
+ }
+ m_active_trans= NULL;
+ }
+ DBUG_RETURN(error);
+}
+
+/*
+ When using LOCK TABLE's external_lock is only called when the actual
+ TABLE LOCK is done.
+ Under LOCK TABLES, each used tables will force a call to start_stmt.
+*/
+
+int ha_ndbcluster::start_stmt(THD *thd)
+{
+ int error=0;
+ DBUG_ENTER("start_stmt");
+ PRINT_OPTION_FLAGS(thd);
+
+ NdbConnection *trans= (NdbConnection*)thd->transaction.stmt.ndb_tid;
+ if (!trans){
+ DBUG_PRINT("trans",("Starting transaction stmt"));
+
+ NdbConnection *tablock_trans=
+ (NdbConnection*)thd->transaction.all.ndb_tid;
+ DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
+ DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
+ if (trans == NULL)
+ ERR_RETURN(m_ndb->getNdbError());
+ thd->transaction.stmt.ndb_tid= trans;
+ }
+ m_active_trans= trans;
+
+ DBUG_RETURN(error);
+}
+
+
+/*
+ Commit a transaction started in NDB
+ */
+
+int ndbcluster_commit(THD *thd, void *ndb_transaction)
+{
+ int res= 0;
+ Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ NdbConnection *trans= (NdbConnection*)ndb_transaction;
+
+ DBUG_ENTER("ndbcluster_commit");
+ DBUG_PRINT("transaction",("%s",
+ trans == thd->transaction.stmt.ndb_tid ?
+ "stmt" : "all"));
+ DBUG_ASSERT(ndb && trans);
+
+ if (trans->execute(Commit) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ ERR_PRINT(err);
+ res= ndb_to_mysql_error(&err);
+ }
+ ndb->closeTransaction(trans);
+ DBUG_RETURN(res);
+}
+
+
+/*
+ Rollback a transaction started in NDB
+ */
+
+int ndbcluster_rollback(THD *thd, void *ndb_transaction)
+{
+ int res= 0;
+ Ndb *ndb= (Ndb*)thd->transaction.ndb;
+ NdbConnection *trans= (NdbConnection*)ndb_transaction;
+
+ DBUG_ENTER("ndbcluster_rollback");
+ DBUG_PRINT("transaction",("%s",
+ trans == thd->transaction.stmt.ndb_tid ?
+ "stmt" : "all"));
+ DBUG_ASSERT(ndb && trans);
+
+ if (trans->execute(Rollback) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ ERR_PRINT(err);
+ res= ndb_to_mysql_error(&err);
+ }
+ ndb->closeTransaction(trans);
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Map MySQL type to the corresponding NDB type
+ */
+
+inline NdbDictionary::Column::Type
+mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
+{
+ switch(mysql_type) {
+ case MYSQL_TYPE_DECIMAL:
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_TINY:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Tinyunsigned :
+ NdbDictionary::Column::Tinyint;
+ case MYSQL_TYPE_SHORT:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Smallunsigned :
+ NdbDictionary::Column::Smallint;
+ case MYSQL_TYPE_LONG:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Unsigned :
+ NdbDictionary::Column::Int;
+ case MYSQL_TYPE_TIMESTAMP:
+ return NdbDictionary::Column::Unsigned;
+ case MYSQL_TYPE_LONGLONG:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Bigunsigned :
+ NdbDictionary::Column::Bigint;
+ case MYSQL_TYPE_INT24:
+ return (unsigned_flg) ?
+ NdbDictionary::Column::Mediumunsigned :
+ NdbDictionary::Column::Mediumint;
+ break;
+ case MYSQL_TYPE_FLOAT:
+ return NdbDictionary::Column::Float;
+ case MYSQL_TYPE_DOUBLE:
+ return NdbDictionary::Column::Double;
+ case MYSQL_TYPE_DATETIME :
+ return NdbDictionary::Column::Datetime;
+ case MYSQL_TYPE_DATE :
+ case MYSQL_TYPE_NEWDATE :
+ case MYSQL_TYPE_TIME :
+ case MYSQL_TYPE_YEAR :
+ // Missing NDB data types, mapped to char
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_ENUM :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_SET :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_TINY_BLOB :
+ case MYSQL_TYPE_MEDIUM_BLOB :
+ case MYSQL_TYPE_LONG_BLOB :
+ case MYSQL_TYPE_BLOB :
+ return NdbDictionary::Column::Blob;
+ case MYSQL_TYPE_VAR_STRING :
+ return NdbDictionary::Column::Varchar;
+ case MYSQL_TYPE_STRING :
+ return NdbDictionary::Column::Char;
+ case MYSQL_TYPE_NULL :
+ case MYSQL_TYPE_GEOMETRY :
+ return NdbDictionary::Column::Undefined;
+ }
+ return NdbDictionary::Column::Undefined;
+}
+
+
+/*
+ Create a table in NDB Cluster
+ */
+
+int ha_ndbcluster::create(const char *name,
+ TABLE *form,
+ HA_CREATE_INFO *info)
+{
+ NDBTAB tab;
+ NdbDictionary::Column::Type ndb_type;
+ NDBCOL col;
+ uint pack_length, length, i;
+ int res;
+ const void *data, *pack_data;
+ const char **key_name= form->keynames.type_names;
+ char name2[FN_HEADLEN];
+
+ DBUG_ENTER("create");
+ DBUG_PRINT("enter", ("name: %s", name));
+ fn_format(name2, name, "", "",2); // Remove the .frm extension
+ set_dbname(name2);
+ set_tabname(name2);
+
+ DBUG_PRINT("table", ("name: %s", m_tabname));
+ tab.setName(m_tabname);
+ tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
+
+ // Save frm data for this table
+ if (readfrm(name, &data, &length))
+ DBUG_RETURN(1);
+ if (packfrm(data, length, &pack_data, &pack_length))
+ DBUG_RETURN(2);
+
+ DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
+ tab.setFrm(pack_data, pack_length);
+ my_free((char*)data, MYF(0));
+ my_free((char*)pack_data, MYF(0));
+
+ for (i= 0; i < form->fields; i++)
+ {
+ Field *field= form->field[i];
+ ndb_type= mysql_to_ndb_type(field->real_type(),
+ field->flags & UNSIGNED_FLAG);
+ DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
+ field->field_name, field->real_type(),
+ field->pack_length()));
+ col.setName(field->field_name);
+ col.setType(ndb_type);
+ if ((ndb_type == NdbDictionary::Column::Char) ||
+ (ndb_type == NdbDictionary::Column::Varchar))
+ col.setLength(field->pack_length());
+ else
+ col.setLength(1);
+ col.setNullable(field->maybe_null());
+ col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
+ if (field->flags & AUTO_INCREMENT_FLAG)
+ {
+ DBUG_PRINT("info", ("Found auto_increment key"));
+ col.setAutoIncrement(TRUE);
+ ulonglong value = info->auto_increment_value ?
+ info->auto_increment_value -1 :
+ (ulonglong) 0;
+ DBUG_PRINT("info", ("initial value=%ld", value));
+// col.setInitialAutIncValue(value);
+ }
+ else
+ col.setAutoIncrement(false);
+
+ tab.addColumn(col);
+ }
+
+ // No primary key, create shadow key as 64 bit, auto increment
+ if (form->primary_key == MAX_KEY)
+ {
+ DBUG_PRINT("info", ("Generating shadow key"));
+ col.setName("$PK");
+ col.setType(NdbDictionary::Column::Bigunsigned);
+ col.setLength(1);
+ col.setNullable(false);
+ col.setPrimaryKey(TRUE);
+ col.setAutoIncrement(TRUE);
+ tab.addColumn(col);
+ }
+
+ my_errno= 0;
+ if (check_ndb_connection())
+ {
+ my_errno= HA_ERR_NO_CONNECTION;
+ DBUG_RETURN(my_errno);
+ }
+
+ // Create the table in NDB
+ NDBDICT *dict= m_ndb->getDictionary();
+ if (dict->createTable(tab))
+ {
+ const NdbError err= dict->getNdbError();
+ ERR_PRINT(err);
+ my_errno= ndb_to_mysql_error(&err);
+ DBUG_RETURN(my_errno);
+ }
+ DBUG_PRINT("info", ("Table %s/%s created successfully",
+ m_dbname, m_tabname));
+
+ // Fetch table from NDB, check that it exists
+ const NDBTAB *tab2= dict->getTable(m_tabname);
+ if (tab2 == NULL)
+ {
+ const NdbError err= dict->getNdbError();
+ ERR_PRINT(err);
+ my_errno= ndb_to_mysql_error(&err);
+ DBUG_RETURN(my_errno);
+ }
+
+ // Create secondary indexes
+ for (i= 0; i < form->keys; i++)
+ {
+ DBUG_PRINT("info", ("Found index %u: %s", i, key_name[i]));
+ if (i == form->primary_key)
+ {
+ DBUG_PRINT("info", ("Skipping it, PK already created"));
+ continue;
+ }
+
+ DBUG_PRINT("info", ("Creating index %u: %s", i, key_name[i]));
+ res= create_index(key_name[i],
+ form->key_info + i);
+ switch(res){
+ case 0:
+ // OK
+ break;
+ default:
+ DBUG_PRINT("error", ("Failed to create index %u", i));
+ drop_table();
+ my_errno= res;
+ goto err_end;
+ }
+ }
+
+err_end:
+ DBUG_RETURN(my_errno);
+}
+
+
+/*
+ Create an index in NDB Cluster
+ */
+
+int ha_ndbcluster::create_index(const char *name,
+ KEY *key_info){
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
+ KEY_PART_INFO *key_part= key_info->key_part;
+ KEY_PART_INFO *end= key_part + key_info->key_parts;
+
+ DBUG_ENTER("create_index");
+ DBUG_PRINT("enter", ("name: %s ", name));
+
+ // Check that an index with the same name do not already exist
+ if (dict->getIndex(name, m_tabname))
+ ERR_RETURN(dict->getNdbError());
+
+ NdbDictionary::Index ndb_index(name);
+ if (key_info->flags & HA_NOSAME)
+ ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
+ else
+ {
+ ndb_index.setType(NdbDictionary::Index::OrderedIndex);
+ // TODO Only temporary ordered indexes supported
+ ndb_index.setLogging(false);
+ }
+ ndb_index.setTable(m_tabname);
+
+ for (; key_part != end; key_part++)
+ {
+ Field *field= key_part->field;
+ DBUG_PRINT("info", ("attr: %s", field->field_name));
+ ndb_index.addColumnName(field->field_name);
+ }
+
+ if (dict->createIndex(ndb_index))
+ ERR_RETURN(dict->getNdbError());
+
+ // Success
+ DBUG_PRINT("info", ("Created index %s", name));
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Rename a table in NDB Cluster
+*/
+
+int ha_ndbcluster::rename_table(const char *from, const char *to)
+{
+ char new_tabname[FN_HEADLEN];
+
+ DBUG_ENTER("ha_ndbcluster::rename_table");
+ set_dbname(from);
+ set_tabname(from);
+ set_tabname(to, new_tabname);
+
+ if (check_ndb_connection()) {
+ my_errno= HA_ERR_NO_CONNECTION;
+ DBUG_RETURN(my_errno);
+ }
+
+ int result= alter_table_name(m_tabname, new_tabname);
+ if (result == 0)
+ set_tabname(to);
+
+ DBUG_RETURN(result);
+}
+
+
+/*
+ Rename a table in NDB Cluster using alter table
+ */
+
+int ha_ndbcluster::alter_table_name(const char *from, const char *to)
+{
+ NDBDICT *dict= m_ndb->getDictionary();
+ const NDBTAB *orig_tab;
+ DBUG_ENTER("alter_table_name_table");
+ DBUG_PRINT("enter", ("Renaming %s to %s", from, to));
+
+ if (!(orig_tab= dict->getTable(from)))
+ ERR_RETURN(dict->getNdbError());
+
+ NdbDictionary::Table copy_tab= dict->getTableForAlteration(from);
+ copy_tab.setName(to);
+ if (dict->alterTable(copy_tab) != 0)
+ ERR_RETURN(dict->getNdbError());
+
+ m_table= NULL;
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Delete a table from NDB Cluster
+ */
+
+int ha_ndbcluster::delete_table(const char *name)
+{
+ DBUG_ENTER("delete_table");
+ DBUG_PRINT("enter", ("name: %s", name));
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ DBUG_RETURN(drop_table());
+}
+
+
+/*
+ Drop a table in NDB Cluster
+ */
+
+int ha_ndbcluster::drop_table()
+{
+ NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
+
+ DBUG_ENTER("drop_table");
+ DBUG_PRINT("enter", ("Deleting %s", m_tabname));
+
+ if (dict->dropTable(m_tabname))
+ {
+ const NdbError err= dict->getNdbError();
+ if (err.code == 709)
+ ; // 709: No such table existed
+ else
+ ERR_RETURN(dict->getNdbError());
+ }
+ release_metadata();
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Drop a database in NDB Cluster
+ */
+
+int ndbcluster_drop_database(const char *path)
+{
+ DBUG_ENTER("ndbcluster_drop_database");
+ // TODO drop all tables for this database
+ DBUG_RETURN(1);
+}
+
+
+longlong ha_ndbcluster::get_auto_increment()
+{
+ // NOTE If number of values to be inserted is known
+ // the autoincrement cache could be used here
+ Uint64 auto_value= m_ndb->getAutoIncrementValue(m_tabname);
+ return (longlong)auto_value;
+}
+
+
+/*
+ Constructor for the NDB Cluster table handler
+ */
+
+ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
+ handler(table_arg),
+ m_active_trans(NULL),
+ m_active_cursor(NULL),
+ m_ndb(NULL),
+ m_table(NULL),
+ m_table_flags(HA_REC_NOT_IN_SEQ |
+ HA_KEYPOS_TO_RNDPOS |
+ HA_NOT_EXACT_COUNT |
+ HA_NO_WRITE_DELAYED |
+ HA_NO_PREFIX_CHAR_KEYS |
+ HA_NO_BLOBS |
+ HA_DROP_BEFORE_CREATE |
+ HA_NOT_READ_AFTER_KEY),
+ m_use_write(false)
+{
+
+ DBUG_ENTER("ha_ndbcluster");
+
+ m_tabname[0]= '\0';
+ m_dbname[0]= '\0';
+
+ // TODO Adjust number of records and other parameters for proper
+ // selection of scan/pk access
+ records= 100;
+ block_size= 1024;
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Destructor for NDB Cluster table handler
+ */
+
+ha_ndbcluster::~ha_ndbcluster()
+{
+ DBUG_ENTER("~ha_ndbcluster");
+
+ release_metadata();
+
+ // Check for open cursor/transaction
+ DBUG_ASSERT(m_active_cursor == NULL);
+ DBUG_ASSERT(m_active_trans == NULL);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Open a table for further use
+ - fetch metadata for this table from NDB
+ - check that table exists
+*/
+
+int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
+{
+ KEY *key;
+ DBUG_ENTER("open");
+ DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
+ name, mode, test_if_locked));
+
+ // Setup ref_length to make room for the whole
+ // primary key to be written in the ref variable
+
+ if (table->primary_key != MAX_KEY)
+ {
+ key= table->key_info+table->primary_key;
+ ref_length= key->key_length;
+ DBUG_PRINT("info", (" ref_length: %d", ref_length));
+ }
+ // Init table lock structure
+ if (!(m_share=get_share(name)))
+ DBUG_RETURN(1);
+ thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
+
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ DBUG_RETURN(get_metadata(name));
+}
+
+
+/*
+ Close the table
+ - release resources setup by open()
+ */
+
+int ha_ndbcluster::close(void)
+{
+ DBUG_ENTER("close");
+ free_share(m_share);
+ release_metadata();
+ m_ndb= NULL;
+ DBUG_RETURN(0);
+}
+
+
+Ndb* ha_ndbcluster::seize_ndb()
+{
+ Ndb* ndb;
+ DBUG_ENTER("seize_ndb");
+
+#ifdef USE_NDB_POOL
+ // Seize from pool
+ ndb= Ndb::seize();
+#else
+ ndb= new Ndb("");
+#endif
+ if (ndb->init(NDB_MAX_TRANSACTIONS) != 0)
+ {
+ ERR_PRINT(ndb->getNdbError());
+ /*
+ TODO
+ Alt.1 If init fails because to many allocated Ndb
+ wait on condition for a Ndb object to be released.
+ Alt.2 Seize/release from pool, wait until next release
+ */
+ delete ndb;
+ ndb= NULL;
+ }
+ DBUG_RETURN(ndb);
+}
+
+
+void ha_ndbcluster::release_ndb(Ndb* ndb)
+{
+ DBUG_ENTER("release_ndb");
+#ifdef USE_NDB_POOL
+ // Release to pool
+ Ndb::release(ndb);
+#else
+ delete ndb;
+#endif
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ If this thread already has a Ndb object allocated
+ in current THD, reuse it. Otherwise
+ seize a Ndb object, assign it to current THD and use it.
+
+ Having a Ndb object also means that a connection to
+ NDB cluster has been opened. The connection is
+ checked.
+
+*/
+
+int ha_ndbcluster::check_ndb_connection()
+{
+ THD* thd= current_thd;
+ Ndb* ndb;
+ DBUG_ENTER("check_ndb_connection");
+
+ if (!thd->transaction.ndb)
+ {
+ ndb= seize_ndb();
+ if (!ndb)
+ DBUG_RETURN(2);
+ thd->transaction.ndb= ndb;
+ }
+ m_ndb= (Ndb*)thd->transaction.ndb;
+ m_ndb->setDatabaseName(m_dbname);
+ if (m_ndb->waitUntilReady() != 0)
+ {
+ DBUG_PRINT("error", ("Ndb was not ready"));
+ DBUG_RETURN(3);
+ }
+ DBUG_RETURN(0);
+}
+
+void ndbcluster_close_connection(THD *thd)
+{
+ Ndb* ndb;
+ DBUG_ENTER("ndbcluster_close_connection");
+ ndb= (Ndb*)thd->transaction.ndb;
+ ha_ndbcluster::release_ndb(ndb);
+ thd->transaction.ndb= NULL;
+ DBUG_VOID_RETURN;
+}
+
+
+/*
+ Try to discover one table from NDB
+ */
+
+int ndbcluster_discover(const char *dbname, const char *name,
+ const void** frmblob, uint* frmlen)
+{
+ uint len;
+ const void* data;
+ const NDBTAB* tab;
+ DBUG_ENTER("ndbcluster_discover");
+ DBUG_PRINT("enter", ("db: %s, name: %s", dbname, name));
+
+ Ndb ndb(dbname);
+ if ((ndb.init() != 0) && (ndb.waitUntilReady() != 0))
+ ERR_RETURN(ndb.getNdbError());
+
+ if (!(tab= ndb.getDictionary()->getTable(name)))
+ {
+ DBUG_PRINT("info", ("Table %s not found", name));
+ DBUG_RETURN(1);
+ }
+
+ DBUG_PRINT("info", ("Found table %s", tab->getName()));
+
+ len= tab->getFrmLength();
+ if (len == 0 || tab->getFrmData() == NULL)
+ {
+ DBUG_PRINT("No frm data found",
+ ("Table is probably created via NdbApi"));
+ DBUG_RETURN(2);
+ }
+
+ if (unpackfrm(&data, &len, tab->getFrmData()))
+ DBUG_RETURN(3);
+
+ *frmlen= len;
+ *frmblob= data;
+
+ DBUG_RETURN(0);
+}
+
+static Ndb* g_ndb= NULL;
+
+#ifdef USE_DISCOVER_ON_STARTUP
+/*
+ Dicover tables from NDB Cluster
+ - fetch a list of tables from NDB
+ - store the frm file for each table on disk
+ - if the table has an attached frm file
+ - if the database of the table exists
+*/
+
+int ndb_discover_tables()
+{
+ uint i;
+ NdbDictionary::Dictionary::List list;
+ NdbDictionary::Dictionary* dict;
+ char path[FN_REFLEN];
+ DBUG_ENTER("ndb_discover_tables");
+
+ /* List tables in NDB Cluster kernel */
+ dict= g_ndb->getDictionary();
+ if (dict->listObjects(list,
+ NdbDictionary::Object::UserTable) != 0)
+ ERR_RETURN(g_ndb->getNdbError());
+
+ for (i= 0 ; i < list.count ; i++)
+ {
+ NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+
+ DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
+ if (create_table_from_handler(t.database, t.name, true))
+ DBUG_PRINT("info", ("Could not discover %s/%s", t.database, t.name));
+ }
+ DBUG_RETURN(0);
+}
+#endif
+
+
+/*
+ Initialise all gloal variables before creating
+ a NDB Cluster table handler
+ */
+
+bool ndbcluster_init()
+{
+ DBUG_ENTER("ndbcluster_init");
+ // Create a Ndb object to open the connection to NDB
+ g_ndb= new Ndb("sys");
+ if (g_ndb->init() != 0)
+ {
+ ERR_PRINT (g_ndb->getNdbError());
+ DBUG_RETURN(TRUE);
+ }
+ if (g_ndb->waitUntilReady() != 0)
+ {
+ ERR_PRINT (g_ndb->getNdbError());
+ DBUG_RETURN(TRUE);
+ }
+ (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
+ (hash_get_key) ndbcluster_get_key,0,0);
+ pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+ ndbcluster_inited= 1;
+#ifdef USE_DISCOVER_ON_STARTUP
+ if (ndb_discover_tables() != 0)
+ DBUG_RETURN(TRUE);
+#endif
+ DBUG_RETURN(false);
+}
+
+
+/*
+ End use of the NDB Cluster table handler
+ - free all global variables allocated by
+ ndcluster_init()
+*/
+
+bool ndbcluster_end()
+{
+ DBUG_ENTER("ndbcluster_end");
+ delete g_ndb;
+ g_ndb= NULL;
+ if (!ndbcluster_inited)
+ DBUG_RETURN(0);
+ hash_free(&ndbcluster_open_tables);
+#ifdef USE_NDB_POOL
+ ndb_pool_release();
+#endif
+ pthread_mutex_destroy(&ndbcluster_mutex);
+ ndbcluster_inited= 0;
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Set m_tabname from full pathname to table file
+ */
+
+void ha_ndbcluster::set_tabname(const char *path_name)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ end= strend(path_name)-1;
+ ptr= end;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len= end - ptr;
+ memcpy(m_tabname, ptr + 1, end - ptr);
+ m_tabname[name_len]= '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+ ptr= m_tabname;
+
+ while (*ptr != '\0') {
+ *ptr = tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+/**
+ * Set a given location from full pathname to table file
+ *
+ */
+void
+ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ end = strend(path_name)-1;
+ ptr = end;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len = end - ptr;
+ memcpy(tabname, ptr + 1, end - ptr);
+ tabname[name_len] = '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+ ptr = tabname;
+
+ while (*ptr != '\0') {
+ *ptr= tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+
+/*
+ Set m_dbname from full pathname to table file
+
+ */
+
+void ha_ndbcluster::set_dbname(const char *path_name)
+{
+ char *end, *ptr;
+
+ /* Scan name from the end */
+ ptr= strend(path_name)-1;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ ptr--;
+ end= ptr;
+ while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+ ptr--;
+ }
+ uint name_len= end - ptr;
+ memcpy(m_dbname, ptr + 1, name_len);
+ m_dbname[name_len]= '\0';
+#ifdef __WIN__
+ /* Put to lower case */
+
+ ptr= m_dbname;
+
+ while (*ptr != '\0') {
+ *ptr= tolower(*ptr);
+ ptr++;
+ }
+#endif
+}
+
+
+ha_rows
+ha_ndbcluster::records_in_range(int inx,
+ const byte *start_key,uint start_key_len,
+ enum ha_rkey_function start_search_flag,
+ const byte *end_key,uint end_key_len,
+ enum ha_rkey_function end_search_flag)
+{
+ ha_rows records= 10;
+ KEY* key_info= table->key_info + inx;
+ uint key_length= key_info->key_length;
+
+ DBUG_ENTER("records_in_range");
+ DBUG_PRINT("enter", ("inx: %d", inx));
+ DBUG_PRINT("enter", ("start_key: %x, start_key_len: %d", start_key, start_key_len));
+ DBUG_PRINT("enter", ("start_search_flag: %d", start_search_flag));
+ DBUG_PRINT("enter", ("end_key: %x, end_key_len: %d", end_key, end_key_len));
+ DBUG_PRINT("enter", ("end_search_flag: %d", end_search_flag));
+
+ /*
+ Check that start_key_len is equal to
+ the length of the used index and
+ prevent partial scan/read of hash indexes by returning HA_POS_ERROR
+ */
+ NDB_INDEX_TYPE idx_type= get_index_type(inx);
+ if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
+ start_key_len < key_length)
+ {
+ DBUG_PRINT("warning", ("Tried to use index which required"
+ "full key length: %d, HA_POS_ERROR",
+ key_length));
+ records= HA_POS_ERROR;
+ }
+ DBUG_RETURN(records);
+}
+
+
+/*
+ Handling the shared NDB_SHARE structure that is needed to
+ provide table locking.
+ It's also used for sharing data with other NDB handlers
+ in the same MySQL Server. There is currently not much
+ data we want to or can share.
+ */
+
+static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
+ my_bool not_used __attribute__((unused)))
+{
+ *length=share->table_name_length;
+ return (byte*) share->table_name;
+}
+
+static NDB_SHARE* get_share(const char *table_name)
+{
+ NDB_SHARE *share;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ uint length=(uint) strlen(table_name);
+ if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) table_name,
+ length)))
+ {
+ if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+ MYF(MY_WME | MY_ZEROFILL))))
+ {
+ share->table_name_length=length;
+ share->table_name=(char*) (share+1);
+ strmov(share->table_name,table_name);
+ if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+ {
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ my_free((gptr) share,0);
+ return 0;
+ }
+ thr_lock_init(&share->lock);
+ pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+ }
+ }
+ share->use_count++;
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return share;
+}
+
+
+static void free_share(NDB_SHARE *share)
+{
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (!--share->use_count)
+ {
+ hash_delete(&ndbcluster_open_tables, (byte*) share);
+ thr_lock_delete(&share->lock);
+ pthread_mutex_destroy(&share->mutex);
+ my_free((gptr) share, MYF(0));
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+}
+
+
+
+/*
+ Internal representation of the frm blob
+
+*/
+
+struct frm_blob_struct
+{
+ struct frm_blob_header
+ {
+ uint ver; // Version of header
+ uint orglen; // Original length of compressed data
+ uint complen; // Compressed length of data, 0=uncompressed
+ } head;
+ char data[1];
+};
+
+
+
+static int packfrm(const void *data, uint len,
+ const void **pack_data, uint *pack_len)
+{
+ int error;
+ ulong org_len, comp_len;
+ uint blob_len;
+ frm_blob_struct* blob;
+ DBUG_ENTER("packfrm");
+ DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
+
+ error= 1;
+ org_len = len;
+ if (my_compress((byte*)data, &org_len, &comp_len))
+ goto err;
+
+ DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
+ DBUG_DUMP("compressed", (char*)data, org_len);
+
+ error= 2;
+ blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
+ if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
+ goto err;
+
+ // Store compressed blob in machine independent format
+ int4store((char*)(&blob->head.ver), 1);
+ int4store((char*)(&blob->head.orglen), comp_len);
+ int4store((char*)(&blob->head.complen), org_len);
+
+ // Copy frm data into blob, already in machine independent format
+ memcpy(blob->data, data, org_len);
+
+ *pack_data = blob;
+ *pack_len = blob_len;
+ error = 0;
+
+ DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
+err:
+ DBUG_RETURN(error);
+
+}
+
+
+static int unpackfrm(const void **unpack_data, uint *unpack_len,
+ const void *pack_data)
+{
+ const frm_blob_struct *blob = (frm_blob_struct*)pack_data;
+ byte *data;
+ ulong complen, orglen, ver;
+ DBUG_ENTER("unpackfrm");
+ DBUG_PRINT("enter", ("pack_data: %x", pack_data));
+
+ complen= uint4korr((char*)&blob->head.complen);
+ orglen= uint4korr((char*)&blob->head.orglen);
+ ver= uint4korr((char*)&blob->head.ver);
+
+ DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
+ ver,complen,orglen));
+ DBUG_DUMP("blob->data", (char*) blob->data, complen);
+
+ if (ver != 1)
+ DBUG_RETURN(1);
+ if (!(data = my_malloc(max(orglen, complen), MYF(MY_WME))))
+ DBUG_RETURN(2);
+ memcpy(data, blob->data, complen);
+
+ if (my_uncompress(data, &complen, &orglen))
+ {
+ my_free((char*)data, MYF(0));
+ DBUG_RETURN(3);
+ }
+
+ *unpack_data = data;
+ *unpack_len = complen;
+
+ DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
+
+ DBUG_RETURN(0);
+}
+#endif /* HAVE_NDBCLUSTER_DB */