summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.h')
-rw-r--r--sql/ha_ndbcluster.h278
1 files changed, 229 insertions, 49 deletions
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index d0f7c020184..4f0db20d0b0 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -25,6 +25,10 @@
#pragma interface /* gcc class implementation */
#endif
+/* Blob tables and events are internal to NDB and must never be accessed */
+#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
+
+#include <NdbApi.hpp>
#include <ndbapi_limits.h>
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
@@ -37,10 +41,16 @@ class NdbScanOperation;
class NdbScanFilter;
class NdbIndexScanOperation;
class NdbBlob;
+class NdbIndexStat;
+class NdbEventOperation;
// connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring;
extern ulong ndb_cache_check_time;
+#ifdef HAVE_NDB_BINLOG
+extern ulong ndb_report_thresh_binlog_epoch_slip;
+extern ulong ndb_report_thresh_binlog_mem_usage;
+#endif
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
@@ -51,22 +61,105 @@ typedef enum ndb_index_type {
ORDERED_INDEX = 5
} NDB_INDEX_TYPE;
+typedef enum ndb_index_status {
+ UNDEFINED = 0,
+ ACTIVE = 1,
+ TO_BE_DROPPED = 2
+} NDB_INDEX_STATUS;
+
typedef struct ndb_index_data {
NDB_INDEX_TYPE type;
- void *index;
- void *unique_index;
+ NDB_INDEX_STATUS status;
+ const NdbDictionary::Index *index;
+ const NdbDictionary::Index *unique_index;
unsigned char *unique_index_attrid_map;
+ // In this version stats are not shared between threads
+ NdbIndexStat* index_stat;
+ uint index_stat_cache_entries;
+ // Simple counter mechanism to decide when to connect to db
+ uint index_stat_update_freq;
+ uint index_stat_query_count;
} NDB_INDEX_DATA;
+typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+ byte*& buffer, uint& buffer_size,
+ my_ptrdiff_t ptrdiff);
+
+typedef enum {
+ NSS_INITIAL= 0,
+ NSS_DROPPED,
+ NSS_ALTERED
+} NDB_SHARE_STATE;
+
typedef struct st_ndbcluster_share {
+ NDB_SHARE_STATE state;
+ MEM_ROOT mem_root;
THR_LOCK lock;
pthread_mutex_t mutex;
- char *table_name;
- uint table_name_length,use_count;
+ char *key;
+ uint key_length;
+ THD *util_lock;
+ uint use_count;
uint commit_count_lock;
ulonglong commit_count;
+ char *db;
+ char *table_name;
+ Ndb::TupleIdRange tuple_id_range;
+#ifdef HAVE_NDB_BINLOG
+ uint32 flags;
+ NdbEventOperation *op;
+ NdbEventOperation *op_old; // for rename table
+ char *old_names; // for rename table
+ TABLE_SHARE *table_share;
+ TABLE *table;
+ byte *record[2]; // pointer to allocated records for receiving data
+ NdbValue *ndb_value[2];
+ MY_BITMAP *subscriber_bitmap;
+#endif
} NDB_SHARE;
+inline
+NDB_SHARE_STATE
+get_ndb_share_state(NDB_SHARE *share)
+{
+ NDB_SHARE_STATE state;
+ pthread_mutex_lock(&share->mutex);
+ state= share->state;
+ pthread_mutex_unlock(&share->mutex);
+ return state;
+}
+
+inline
+void
+set_ndb_share_state(NDB_SHARE *share, NDB_SHARE_STATE state)
+{
+ pthread_mutex_lock(&share->mutex);
+ share->state= state;
+ pthread_mutex_unlock(&share->mutex);
+}
+
+struct Ndb_tuple_id_range_guard {
+ Ndb_tuple_id_range_guard(NDB_SHARE* _share) :
+ share(_share),
+ range(share->tuple_id_range) {
+ pthread_mutex_lock(&share->mutex);
+ }
+ ~Ndb_tuple_id_range_guard() {
+ pthread_mutex_unlock(&share->mutex);
+ }
+ NDB_SHARE* share;
+ Ndb::TupleIdRange& range;
+};
+
+#ifdef HAVE_NDB_BINLOG
+/* NDB_SHARE.flags */
+#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
+#define NSF_BLOB_FLAG 2 /* table has blob attributes */
+#define NSF_NO_BINLOG 4 /* table should not be binlogged */
+#endif
+
typedef enum ndb_item_type {
NDB_VALUE = 0, // Qualified more with Item::Type
NDB_FIELD = 1, // Qualified from table definition
@@ -115,6 +208,7 @@ struct negated_function_mapping
NDB_FUNC_TYPE neg_fun;
};
+
/*
Define what functions can be negated in condition pushdown.
Note, these HAVE to be in the same order as in definition enum
@@ -256,7 +350,12 @@ class Ndb_item {
const Item *item= value.item;
if (item && field)
- ((Item *)item)->save_in_field(field, false);
+ {
+ my_bitmap_map *old_map=
+ dbug_tmp_use_all_columns(field->table, field->table->write_set);
+ ((Item *)item)->save_in_field(field, FALSE);
+ dbug_tmp_restore_column_map(field->table->write_set, old_map);
+ }
};
static NDB_FUNC_TYPE item_func_to_ndb_func(Item_func::Functype fun)
@@ -478,6 +577,7 @@ class Ndb_cond_traverse_context
Ndb_rewrite_context *rewrite_stack;
};
+
typedef enum ndb_query_state_bits {
NDB_QUERY_NORMAL = 0,
NDB_QUERY_MULTI_READ_RANGE = 1
@@ -487,34 +587,57 @@ typedef enum ndb_query_state_bits {
Place holder for ha_ndbcluster thread specific data
*/
+enum THD_NDB_OPTIONS
+{
+ TNO_NO_LOG_SCHEMA_OP= 1 << 0
+};
+
+struct Ndb_local_table_statistics {
+ int no_uncommitted_rows_count;
+ ulong last_count;
+ ha_rows records;
+};
+
+typedef struct st_thd_ndb_share {
+ const void *key;
+ struct Ndb_local_table_statistics stat;
+} THD_NDB_SHARE;
+
class Thd_ndb
{
public:
Thd_ndb();
~Thd_ndb();
+
+ void init_open_tables();
+ THD_NDB_SHARE *get_open_table(THD *thd, const void *key);
+
Ndb *ndb;
ulong count;
uint lock_count;
NdbTransaction *all;
NdbTransaction *stmt;
int error;
+ uint32 options;
List<NDB_SHARE> changed_tables;
uint query_state;
+ HASH open_tables;
};
class ha_ndbcluster: public handler
{
public:
- ha_ndbcluster(TABLE *table);
+ ha_ndbcluster(handlerton *hton, TABLE_SHARE *table);
~ha_ndbcluster();
+ int ha_initialise();
int open(const char *name, int mode, uint test_if_locked);
int close(void);
int write_row(byte *buf);
int update_row(const byte *old_data, byte *new_data);
int delete_row(const byte *buf);
- int index_init(uint index);
+ int index_init(uint index, bool sorted);
int index_end();
int index_read(byte *buf, const byte *key, uint key_len,
enum ha_rkey_function find_flag);
@@ -538,6 +661,7 @@ class ha_ndbcluster: public handler
bool eq_range, bool sorted,
byte* buf);
int read_range_next();
+ int alter_tablespace(st_alter_tablespace *info);
/**
* Multi range stuff
@@ -548,15 +672,24 @@ class ha_ndbcluster: public handler
int read_multi_range_next(KEY_MULTI_RANGE **found_range_p);
bool get_error_message(int error, String *buf);
+ ha_rows records();
int info(uint);
+ void get_dynamic_partition_info(PARTITION_INFO *stat_info, uint part_id);
int extra(enum ha_extra_function operation);
int extra_opt(enum ha_extra_function operation, ulong cache_size);
+ int reset();
int external_lock(THD *thd, int lock_type);
void unlock_row();
int start_stmt(THD *thd, thr_lock_type lock_type);
+ void print_error(int error, myf errflag);
const char * table_type() const;
const char ** bas_ext() const;
- ulong table_flags(void) const;
+ ulonglong table_flags(void) const;
+ void prepare_for_alter();
+ int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys);
+ int prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys);
+ int final_drop_index(TABLE *table_arg);
+ void set_part_info(partition_info *part_info);
ulong index_flags(uint idx, uint part, bool all_parts) const;
uint max_supported_record_length() const;
uint max_supported_keys() const;
@@ -567,12 +700,25 @@ class ha_ndbcluster: public handler
int rename_table(const char *from, const char *to);
int delete_table(const char *name);
int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
+ int create_handler_files(const char *file, const char *old_name,
+ int action_flag, HA_CREATE_INFO *info);
+ int get_default_no_partitions(HA_CREATE_INFO *info);
+ bool get_no_parts(const char *name, uint *no_parts);
+ void set_auto_partitions(partition_info *part_info);
+ virtual bool is_fatal_error(int error, uint flags)
+ {
+ if (!handler::is_fatal_error(error, flags) ||
+ error == HA_ERR_NO_PARTITION_FOUND)
+ return FALSE;
+ return TRUE;
+ }
+
THR_LOCK_DATA **store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type);
bool low_byte_first() const;
- bool has_transactions();
+
const char* index_type(uint key_number);
double scan_time();
@@ -607,7 +753,7 @@ static void set_tabname(const char *pathname, char *tabname);
AND ... AND pushed_condN)
or less restrictive condition, depending on handler's capabilities.
- handler->extra(HA_EXTRA_RESET) call empties the condition stack.
+ handler->reset() call empties the condition stack.
Calls to rnd_init/rnd_end, index_init/index_end etc do not affect the
condition stack.
The current implementation supports arbitrary AND/OR nested conditions
@@ -636,23 +782,55 @@ static void set_tabname(const char *pathname, char *tabname);
uint key_length,
qc_engine_callback *engine_callback,
ulonglong *engine_data);
+
+ bool check_if_incompatible_data(HA_CREATE_INFO *info,
+ uint table_changes);
+
private:
- int alter_table_name(const char *to);
- int drop_table();
- int create_index(const char *name, KEY *key_info, bool unique);
+ friend int ndbcluster_drop_database_impl(const char *path);
+ friend int ndb_handle_schema_change(THD *thd,
+ Ndb *ndb, NdbEventOperation *pOp,
+ NDB_SHARE *share);
+
+ static int delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name);
+ int create_ndb_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
- int initialize_autoincrement(const void *table);
- enum ILBP {ILBP_CREATE = 0, ILBP_OPEN = 1}; // Index List Build Phase
- int build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase);
+ int create_index(const char *name, KEY *key_info,
+ NDB_INDEX_TYPE idx_type, uint idx_no);
+// Index list management
+ int create_indexes(Ndb *ndb, TABLE *tab);
+ int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error);
+ void renumber_indexes(Ndb *ndb, TABLE *tab);
+ int drop_indexes(Ndb *ndb, TABLE *tab);
+ int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
+ KEY *key_info, const char *index_name, uint index_no);
int get_metadata(const char* path);
- void release_metadata();
+ void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
- int check_index_fields_not_null(uint index_no);
+ NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info,
+ bool primary) const;
+ int check_index_fields_not_null(KEY *key_info);
+
+ uint set_up_partition_info(partition_info *part_info,
+ TABLE *table,
+ void *tab);
+ char* get_tablespace_name(THD *thd);
+ int set_range_data(void *tab, partition_info* part_info);
+ int set_list_data(void *tab, partition_info* part_info);
+ int complemented_read(const byte *old_data, byte *new_data,
+ uint32 old_part_id);
+ int pk_read(const byte *key, uint key_len, byte *buf, uint32 part_id);
+ int ordered_index_scan(const key_range *start_key,
+ const key_range *end_key,
+ bool sorted, bool descending, byte* buf,
+ part_id_range *part_spec);
+ int full_table_scan(byte * buf);
- int pk_read(const byte *key, uint key_len, byte *buf);
- int complemented_pk_read(const byte *old_data, byte *new_data);
bool check_all_operations_for_error(NdbTransaction *trans,
const NdbOperation *first,
const NdbOperation *last,
@@ -660,10 +838,6 @@ private:
int peek_indexed_rows(const byte *record);
int unique_index_read(const byte *key, uint key_len,
byte *buf);
- int ordered_index_scan(const key_range *start_key,
- const key_range *end_key,
- bool sorted, bool descending, byte* buf);
- int full_table_scan(byte * buf);
int fetch_next(NdbScanOperation* op);
int next_result(byte *buf);
int define_read_attrs(byte* buf, NdbOperation* op);
@@ -681,27 +855,30 @@ private:
uint fieldnr, const byte* field_ptr);
int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr);
- int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, bool *set_blob_value= 0);
+ int set_ndb_value(NdbOperation*, Field *field, uint fieldnr,
+ int row_offset= 0, bool *set_blob_value= 0);
int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*);
+ int get_ndb_partition_id(NdbOperation *);
friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
- int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff);
int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key_from_record(NdbOperation *op, const byte *record);
int set_index_key_from_record(NdbOperation *op, const byte *record,
uint keyno);
- int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0);
+ int set_bounds(NdbIndexScanOperation*, uint inx, bool rir,
+ const key_range *keys[2], uint= 0);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr);
void print_results();
- ulonglong get_auto_increment();
- void invalidate_dictionary_cache(bool global);
-
-bool uses_blob_value(bool all_fields);
+ virtual void get_auto_increment(ulonglong offset, ulonglong increment,
+ ulonglong nb_desired_values,
+ ulonglong *first_value,
+ ulonglong *nb_reserved_values);
+ bool uses_blob_value();
char *update_table_comment(const char * comment);
- int write_ndb_file();
+ int write_ndb_file(const char *name);
int check_ndb_connection(THD* thd= current_thd);
@@ -709,9 +886,10 @@ bool uses_blob_value(bool all_fields);
int records_update();
void no_uncommitted_rows_execute_failure();
void no_uncommitted_rows_update(int);
- void no_uncommitted_rows_init(THD *);
void no_uncommitted_rows_reset(THD *);
+ void release_completed_operations(NdbTransaction*, bool);
+
/*
Condition pushdown
*/
@@ -727,33 +905,39 @@ bool uses_blob_value(bool all_fields);
NdbScanOperation* op);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
+ friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
- void *m_table;
- int m_table_version;
- void *m_table_info;
+ const NdbDictionary::Table *m_table;
+ struct Ndb_local_table_statistics *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
char m_tabname[FN_HEADLEN];
- ulong m_table_flags;
+ ulonglong m_table_flags;
THR_LOCK_DATA m_lock;
bool m_lock_tuple;
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
+ THD_NDB_SHARE *m_thd_ndb_share;
// NdbRecAttr has no reference to blob
- typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
byte m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH];
+ partition_info *m_part_info;
+ uint32 m_part_id;
+ byte *m_rec0;
+ Field **m_part_field_array;
+ bool m_use_partition_function;
+ bool m_sorted;
bool m_use_write;
bool m_ignore_dup_key;
bool m_has_unique_index;
bool m_primary_key_update;
- bool m_retrieve_all_fields;
- bool m_retrieve_primary_key;
- ha_rows m_rows_to_insert;
+ bool m_write_op;
+ bool m_ignore_no_key;
+ ha_rows m_rows_to_insert; // TODO: merge it with handler::estimation_rows_to_insert?
ha_rows m_rows_inserted;
ha_rows m_bulk_insert_rows;
ha_rows m_rows_changed;
@@ -771,7 +955,6 @@ bool uses_blob_value(bool all_fields);
bool m_force_send;
ha_rows m_autoincrement_prefetch;
bool m_transaction_on;
- void release_completed_operations(NdbTransaction*, bool);
Ndb_cond_stack *m_cond_stack;
bool m_disable_multi_read;
@@ -785,10 +968,7 @@ bool uses_blob_value(bool all_fields);
Ndb *get_ndb();
};
-extern struct show_var_st ndb_status_variables[];
-
-bool ndbcluster_init(void);
-bool ndbcluster_end(void);
+extern SHOW_VAR ndb_status_variables[];
int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen);
@@ -796,8 +976,8 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files);
int ndbcluster_table_exists_in_engine(THD* thd,
const char *db, const char *name);
-int ndbcluster_drop_database(const char* path);
-
void ndbcluster_print_error(int error, const NdbOperation *error_op);
-int ndbcluster_show_status(THD*);
+static const char ndbcluster_hton_name[]= "ndbcluster";
+static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
+