summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.h')
-rw-r--r--sql/ha_ndbcluster.h382
1 files changed, 285 insertions, 97 deletions
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index 324969ad374..fd337303853 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -24,23 +24,32 @@
#pragma interface /* gcc class implementation */
#endif
+/* Blob tables and events are internal to NDB and must never be accessed */
+#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
+
+#include <NdbApi.hpp>
#include <ndbapi_limits.h>
#define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
-/* Forward declarations */
-class Ndb;
-class NdbOperation;
-class NdbTransaction;
-class NdbRecAttr;
+
+class Ndb; // Forward declaration
+class NdbOperation; // Forward declaration
+class NdbTransaction; // Forward declaration
+class NdbRecAttr; // Forward declaration
class NdbScanOperation;
class NdbIndexScanOperation;
class NdbBlob;
+class NdbIndexStat;
+class NdbEventOperation;
class ha_ndbcluster_cond;
// connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring;
extern ulong ndb_cache_check_time;
-extern char opt_ndb_constrbuf[];
+#ifdef HAVE_NDB_BINLOG
+extern ulong ndb_report_thresh_binlog_epoch_slip;
+extern ulong ndb_report_thresh_binlog_mem_usage;
+#endif
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
@@ -51,12 +60,25 @@ typedef enum ndb_index_type {
ORDERED_INDEX = 5
} NDB_INDEX_TYPE;
+typedef enum ndb_index_status {
+ UNDEFINED = 0,
+ ACTIVE = 1,
+ TO_BE_DROPPED = 2
+} NDB_INDEX_STATUS;
+
typedef struct ndb_index_data {
NDB_INDEX_TYPE type;
- void *index;
- void *unique_index;
+ NDB_INDEX_STATUS status;
+ const NdbDictionary::Index *index;
+ const NdbDictionary::Index *unique_index;
unsigned char *unique_index_attrid_map;
bool null_in_unique_index;
+ // In this version stats are not shared between threads
+ NdbIndexStat* index_stat;
+ uint index_stat_cache_entries;
+ // Simple counter mechanism to decide when to connect to db
+ uint index_stat_update_freq;
+ uint index_stat_query_count;
} NDB_INDEX_DATA;
typedef enum ndb_write_op {
@@ -65,15 +87,86 @@ typedef enum ndb_write_op {
NDB_PK_UPDATE = 2
} NDB_WRITE_OP;
+typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+ uchar*& buffer, uint& buffer_size,
+ my_ptrdiff_t ptrdiff);
+
+typedef enum {
+ NSS_INITIAL= 0,
+ NSS_DROPPED,
+ NSS_ALTERED
+} NDB_SHARE_STATE;
+
typedef struct st_ndbcluster_share {
+ NDB_SHARE_STATE state;
+ MEM_ROOT mem_root;
THR_LOCK lock;
pthread_mutex_t mutex;
- char *table_name;
- uint table_name_length,use_count;
+ char *key;
+ uint key_length;
+ THD *util_lock;
+ uint use_count;
uint commit_count_lock;
ulonglong commit_count;
+ char *db;
+ char *table_name;
+ Ndb::TupleIdRange tuple_id_range;
+#ifdef HAVE_NDB_BINLOG
+ uint32 connect_count;
+ uint32 flags;
+ NdbEventOperation *op;
+ NdbEventOperation *op_old; // for rename table
+ char *old_names; // for rename table
+ TABLE_SHARE *table_share;
+ TABLE *table;
+ uchar *record[2]; // pointer to allocated records for receiving data
+ NdbValue *ndb_value[2];
+ MY_BITMAP *subscriber_bitmap;
+#endif
} NDB_SHARE;
+inline
+NDB_SHARE_STATE
+get_ndb_share_state(NDB_SHARE *share)
+{
+ NDB_SHARE_STATE state;
+ pthread_mutex_lock(&share->mutex);
+ state= share->state;
+ pthread_mutex_unlock(&share->mutex);
+ return state;
+}
+
+inline
+void
+set_ndb_share_state(NDB_SHARE *share, NDB_SHARE_STATE state)
+{
+ pthread_mutex_lock(&share->mutex);
+ share->state= state;
+ pthread_mutex_unlock(&share->mutex);
+}
+
+struct Ndb_tuple_id_range_guard {
+ Ndb_tuple_id_range_guard(NDB_SHARE* _share) :
+ share(_share),
+ range(share->tuple_id_range) {
+ pthread_mutex_lock(&share->mutex);
+ }
+ ~Ndb_tuple_id_range_guard() {
+ pthread_mutex_unlock(&share->mutex);
+ }
+ NDB_SHARE* share;
+ Ndb::TupleIdRange& range;
+};
+
+#ifdef HAVE_NDB_BINLOG
+/* NDB_SHARE.flags */
+#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
+#define NSF_BLOB_FLAG 2 /* table has blob attributes */
+#define NSF_NO_BINLOG 4 /* table should not be binlogged */
+#endif
+
typedef enum ndb_query_state_bits {
NDB_QUERY_NORMAL = 0,
NDB_QUERY_MULTI_READ_RANGE = 1
@@ -83,57 +176,89 @@ typedef enum ndb_query_state_bits {
Place holder for ha_ndbcluster thread specific data
*/
+enum THD_NDB_OPTIONS
+{
+ TNO_NO_LOG_SCHEMA_OP= 1 << 0
+};
+
+enum THD_NDB_TRANS_OPTIONS
+{
+ TNTO_INJECTED_APPLY_STATUS= 1 << 0
+ ,TNTO_NO_LOGGING= 1 << 1
+};
+
+struct Ndb_local_table_statistics {
+ int no_uncommitted_rows_count;
+ ulong last_count;
+ ha_rows records;
+};
+
+typedef struct st_thd_ndb_share {
+ const void *key;
+ struct Ndb_local_table_statistics stat;
+} THD_NDB_SHARE;
+
class Thd_ndb
{
public:
Thd_ndb();
~Thd_ndb();
+
+ void init_open_tables();
+ THD_NDB_SHARE *get_open_table(THD *thd, const void *key);
+
Ndb *ndb;
ulong count;
uint lock_count;
- NdbTransaction *all;
- NdbTransaction *stmt;
- int error;
+ uint start_stmt_count;
+ NdbTransaction *trans;
+ bool m_error;
+ bool m_slow_path;
+ int m_error_code;
+ uint32 m_query_id; /* query id whn m_error_code was set */
+ uint32 options;
+ uint32 trans_options;
List<NDB_SHARE> changed_tables;
uint query_state;
+ HASH open_tables;
};
class ha_ndbcluster: public handler
{
public:
- ha_ndbcluster(TABLE *table);
+ ha_ndbcluster(handlerton *hton, TABLE_SHARE *table);
~ha_ndbcluster();
+ int ha_initialise();
int open(const char *name, int mode, uint test_if_locked);
int close(void);
- int write_row(byte *buf);
- int update_row(const byte *old_data, byte *new_data);
- int delete_row(const byte *buf);
- int index_init(uint index);
+ int write_row(uchar *buf);
+ int update_row(const uchar *old_data, uchar *new_data);
+ int delete_row(const uchar *buf);
+ int index_init(uint index, bool sorted);
int index_end();
- int index_read(byte *buf, const byte *key, uint key_len,
+ int index_read(uchar *buf, const uchar *key, uint key_len,
enum ha_rkey_function find_flag);
- int index_read_idx(byte *buf, uint index, const byte *key, uint key_len,
- enum ha_rkey_function find_flag);
- int index_next(byte *buf);
- int index_prev(byte *buf);
- int index_first(byte *buf);
- int index_last(byte *buf);
- int index_read_last(byte * buf, const byte * key, uint key_len);
+ int index_next(uchar *buf);
+ int index_prev(uchar *buf);
+ int index_first(uchar *buf);
+ int index_last(uchar *buf);
+ int index_read_last(uchar * buf, const uchar * key, uint key_len);
int rnd_init(bool scan);
int rnd_end();
- int rnd_next(byte *buf);
- int rnd_pos(byte *buf, byte *pos);
- void position(const byte *record);
+ int rnd_next(uchar *buf);
+ int rnd_pos(uchar *buf, uchar *pos);
+ void position(const uchar *record);
int read_range_first(const key_range *start_key,
const key_range *end_key,
bool eq_range, bool sorted);
int read_range_first_to_buf(const key_range *start_key,
const key_range *end_key,
bool eq_range, bool sorted,
- byte* buf);
+ uchar* buf);
int read_range_next();
+ int alter_tablespace(st_alter_tablespace *info);
/**
* Multi range stuff
@@ -145,17 +270,28 @@ class ha_ndbcluster: public handler
bool null_value_index_search(KEY_MULTI_RANGE *ranges,
KEY_MULTI_RANGE *end_range,
HANDLER_BUFFER *buffer);
+
bool get_error_message(int error, String *buf);
+ ha_rows records();
+ ha_rows estimate_rows_upper_bound()
+ { return HA_POS_ERROR; }
int info(uint);
+ void get_dynamic_partition_info(PARTITION_INFO *stat_info, uint part_id);
int extra(enum ha_extra_function operation);
int extra_opt(enum ha_extra_function operation, ulong cache_size);
int reset();
int external_lock(THD *thd, int lock_type);
void unlock_row();
int start_stmt(THD *thd, thr_lock_type lock_type);
+ void print_error(int error, myf errflag);
const char * table_type() const;
const char ** bas_ext() const;
- ulong table_flags(void) const;
+ ulonglong table_flags(void) const;
+ void prepare_for_alter();
+ int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys);
+ int prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys);
+ int final_drop_index(TABLE *table_arg);
+ void set_part_info(partition_info *part_info);
ulong index_flags(uint idx, uint part, bool all_parts) const;
uint max_supported_record_length() const;
uint max_supported_keys() const;
@@ -166,12 +302,25 @@ class ha_ndbcluster: public handler
int rename_table(const char *from, const char *to);
int delete_table(const char *name);
int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
+ int create_handler_files(const char *file, const char *old_name,
+ int action_flag, HA_CREATE_INFO *info);
+ int get_default_no_partitions(HA_CREATE_INFO *info);
+ bool get_no_parts(const char *name, uint *no_parts);
+ void set_auto_partitions(partition_info *part_info);
+ virtual bool is_fatal_error(int error, uint flags)
+ {
+ if (!handler::is_fatal_error(error, flags) ||
+ error == HA_ERR_NO_PARTITION_FOUND)
+ return FALSE;
+ return TRUE;
+ }
+
THR_LOCK_DATA **store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type);
bool low_byte_first() const;
- bool has_transactions();
+
const char* index_type(uint key_number);
double scan_time();
@@ -206,7 +355,7 @@ static void set_tabname(const char *pathname, char *tabname);
AND ... AND pushed_condN)
or less restrictive condition, depending on handler's capabilities.
- handler->extra(HA_EXTRA_RESET) call empties the condition stack.
+ handler->reset() call empties the condition stack.
Calls to rnd_init/rnd_end, index_init/index_end etc do not affect the
condition stack.
The current implementation supports arbitrary AND/OR nested conditions
@@ -235,79 +384,109 @@ static void set_tabname(const char *pathname, char *tabname);
uint key_length,
qc_engine_callback *engine_callback,
ulonglong *engine_data);
+
+ bool check_if_incompatible_data(HA_CREATE_INFO *info,
+ uint table_changes);
+
private:
- int alter_table_name(const char *to);
- int drop_table();
- int create_index(const char *name, KEY *key_info, bool unique);
+ friend int ndbcluster_drop_database_impl(const char *path);
+ friend int ndb_handle_schema_change(THD *thd,
+ Ndb *ndb, NdbEventOperation *pOp,
+ NDB_SHARE *share);
+
+ static int delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name);
+ int create_ndb_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
int create_unique_index(const char *name, KEY *key_info);
- int initialize_autoincrement(const void *table);
- enum ILBP {ILBP_CREATE = 0, ILBP_OPEN = 1}; // Index List Build Phase
- int build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase);
+ int create_index(const char *name, KEY *key_info,
+ NDB_INDEX_TYPE idx_type, uint idx_no);
+// Index list management
+ int create_indexes(Ndb *ndb, TABLE *tab);
+ int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error);
+ void renumber_indexes(Ndb *ndb, TABLE *tab);
+ int drop_indexes(Ndb *ndb, TABLE *tab);
+ int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
+ KEY *key_info, const char *index_name, uint index_no);
int get_metadata(const char* path);
- void release_metadata();
+ void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
+ NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info,
+ bool primary) const;
bool has_null_in_unique_index(uint idx_no) const;
- bool check_index_fields_not_null(uint index_no);
-
- int pk_read(const byte *key, uint key_len, byte *buf);
- int complemented_pk_read(const byte *old_data, byte *new_data);
- bool check_all_operations_for_error(NdbTransaction *trans,
- const NdbOperation *first,
- const NdbOperation *last,
- uint errcode);
- int peek_indexed_rows(const byte *record, NDB_WRITE_OP write_op);
- int unique_index_read(const byte *key, uint key_len,
- byte *buf);
+ bool check_index_fields_not_null(KEY *key_info);
+
+ uint set_up_partition_info(partition_info *part_info,
+ TABLE *table,
+ void *tab);
+ char* get_tablespace_name(THD *thd, char *name, uint name_len);
+ int set_range_data(void *tab, partition_info* part_info);
+ int set_list_data(void *tab, partition_info* part_info);
+ int complemented_read(const uchar *old_data, uchar *new_data,
+ uint32 old_part_id);
+ int pk_read(const uchar *key, uint key_len, uchar *buf, uint32 part_id);
int ordered_index_scan(const key_range *start_key,
const key_range *end_key,
- bool sorted, bool descending, byte* buf);
+ bool sorted, bool descending, uchar* buf,
+ part_id_range *part_spec);
+ int unique_index_read(const uchar *key, uint key_len,
+ uchar *buf);
int unique_index_scan(const KEY* key_info,
- const byte *key,
+ const uchar *key,
uint key_len,
- byte *buf);
+ uchar *buf);
+ int full_table_scan(uchar * buf);
- int full_table_scan(byte * buf);
+ bool check_all_operations_for_error(NdbTransaction *trans,
+ const NdbOperation *first,
+ const NdbOperation *last,
+ uint errcode);
+ int peek_indexed_rows(const uchar *record, NDB_WRITE_OP write_op);
int fetch_next(NdbScanOperation* op);
- int next_result(byte *buf);
- int define_read_attrs(byte* buf, NdbOperation* op);
- int filtered_scan(const byte *key, uint key_len,
- byte *buf,
+ int next_result(uchar *buf);
+ int define_read_attrs(uchar* buf, NdbOperation* op);
+ int filtered_scan(const uchar *key, uint key_len,
+ uchar *buf,
enum ha_rkey_function find_flag);
int close_scan();
- void unpack_record(byte *buf);
+ void unpack_record(uchar *buf);
int get_ndb_lock_type(enum thr_lock_type type);
void set_dbname(const char *pathname);
void set_tabname(const char *pathname);
bool set_hidden_key(NdbOperation*,
- uint fieldnr, const byte* field_ptr);
+ uint fieldnr, const uchar* field_ptr);
int set_ndb_key(NdbOperation*, Field *field,
- uint fieldnr, const byte* field_ptr);
- int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, bool *set_blob_value= 0);
- int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*);
+ uint fieldnr, const uchar* field_ptr);
+ int set_ndb_value(NdbOperation*, Field *field, uint fieldnr,
+ int row_offset= 0, bool *set_blob_value= 0);
+ int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, uchar*);
+ int get_ndb_partition_id(NdbOperation *);
friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
- int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff);
- int set_primary_key(NdbOperation *op, const byte *key);
- int set_primary_key_from_record(NdbOperation *op, const byte *record);
+ int set_primary_key(NdbOperation *op, const uchar *key);
+ int set_primary_key_from_record(NdbOperation *op, const uchar *record);
bool check_index_fields_in_write_set(uint keyno);
- int set_index_key_from_record(NdbOperation *op, const byte *record,
+ int set_index_key_from_record(NdbOperation *op, const uchar *record,
uint keyno);
- int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0);
- int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
- int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr);
+ int set_bounds(NdbIndexScanOperation*, uint inx, bool rir,
+ const key_range *keys[2], uint= 0);
+ int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
+ int set_index_key(NdbOperation *, const KEY *key_info, const uchar *key_ptr);
void print_results();
- ulonglong get_auto_increment();
- void invalidate_dictionary_cache(bool global);
-
-bool uses_blob_value(bool all_fields);
+ virtual void get_auto_increment(ulonglong offset, ulonglong increment,
+ ulonglong nb_desired_values,
+ ulonglong *first_value,
+ ulonglong *nb_reserved_values);
+ bool uses_blob_value();
char *update_table_comment(const char * comment);
- int write_ndb_file();
+ int write_ndb_file(const char *name);
int check_ndb_connection(THD* thd= current_thd);
@@ -315,39 +494,48 @@ bool uses_blob_value(bool all_fields);
int records_update();
void no_uncommitted_rows_execute_failure();
void no_uncommitted_rows_update(int);
- void no_uncommitted_rows_init(THD *);
void no_uncommitted_rows_reset(THD *);
void release_completed_operations(NdbTransaction*, bool);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
+ friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool);
+ void transaction_checks(THD *thd);
+ int start_statement(THD *thd, Thd_ndb *thd_ndb, Ndb* ndb);
+ int init_handler_for_statement(THD *thd, Thd_ndb *thd_ndb);
+
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
- void *m_table;
- int m_table_version;
- void *m_table_info;
+ const NdbDictionary::Table *m_table;
+ struct Ndb_local_table_statistics *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
char m_tabname[FN_HEADLEN];
- ulong m_table_flags;
+ ulonglong m_table_flags;
THR_LOCK_DATA m_lock;
bool m_lock_tuple;
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
+ THD_NDB_SHARE *m_thd_ndb_share;
// NdbRecAttr has no reference to blob
- typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
- byte m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH];
+ uchar m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH];
+ partition_info *m_part_info;
+ uint32 m_part_id;
+ uchar *m_rec0;
+ Field **m_part_field_array;
+ bool m_use_partition_function;
+ bool m_sorted;
bool m_use_write;
bool m_ignore_dup_key;
bool m_has_unique_index;
bool m_primary_key_update;
- bool m_retrieve_all_fields;
- bool m_retrieve_primary_key;
- ha_rows m_rows_to_insert;
+ bool m_write_op;
+ bool m_ignore_no_key;
+ ha_rows m_rows_to_insert; // TODO: merge it with handler::estimation_rows_to_insert?
ha_rows m_rows_inserted;
ha_rows m_bulk_insert_rows;
ha_rows m_rows_changed;
@@ -357,9 +545,10 @@ bool uses_blob_value(bool all_fields);
ha_rows m_ops_pending;
bool m_skip_auto_increment;
bool m_blobs_pending;
+ bool m_slow_path;
my_ptrdiff_t m_blobs_offset;
// memory for blobs in one tuple
- char *m_blobs_buffer;
+ uchar *m_blobs_buffer;
uint32 m_blobs_buffer_size;
uint m_dupkey;
// set from thread variables at external lock
@@ -370,29 +559,28 @@ bool uses_blob_value(bool all_fields);
ha_ndbcluster_cond *m_cond;
bool m_disable_multi_read;
- byte *m_multi_range_result_ptr;
+ uchar *m_multi_range_result_ptr;
KEY_MULTI_RANGE *m_multi_ranges;
KEY_MULTI_RANGE *m_multi_range_defined;
const NdbOperation *m_current_multi_operation;
NdbIndexScanOperation *m_multi_cursor;
- byte *m_multi_range_cursor_result_ptr;
+ uchar *m_multi_range_cursor_result_ptr;
int setup_recattr(const NdbRecAttr*);
Ndb *get_ndb();
};
-extern struct show_var_st ndb_status_variables[];
-
-bool ndbcluster_init(void);
-bool ndbcluster_end(void);
+extern SHOW_VAR ndb_status_variables[];
int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen);
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
- const char *wild, bool dir, List<char> *files);
+ const char *wild, bool dir, List<LEX_STRING> *files);
int ndbcluster_table_exists_in_engine(THD* thd,
const char *db, const char *name);
-int ndbcluster_drop_database(const char* path);
-
void ndbcluster_print_error(int error, const NdbOperation *error_op);
-int ndbcluster_show_status(THD*);
+static const char ndbcluster_hton_name[]= "ndbcluster";
+static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
+extern int ndbcluster_terminating;
+extern int ndb_util_thread_running;
+extern pthread_cond_t COND_ndb_util_ready;