diff options
Diffstat (limited to 'sql/ha_ndbcluster.h')
-rw-r--r-- | sql/ha_ndbcluster.h | 278 |
1 files changed, 229 insertions, 49 deletions
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index d0f7c020184..4f0db20d0b0 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -25,6 +25,10 @@ #pragma interface /* gcc class implementation */ #endif +/* Blob tables and events are internal to NDB and must never be accessed */ +#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB") + +#include <NdbApi.hpp> #include <ndbapi_limits.h> #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8 @@ -37,10 +41,16 @@ class NdbScanOperation; class NdbScanFilter; class NdbIndexScanOperation; class NdbBlob; +class NdbIndexStat; +class NdbEventOperation; // connectstring to cluster if given by mysqld extern const char *ndbcluster_connectstring; extern ulong ndb_cache_check_time; +#ifdef HAVE_NDB_BINLOG +extern ulong ndb_report_thresh_binlog_epoch_slip; +extern ulong ndb_report_thresh_binlog_mem_usage; +#endif typedef enum ndb_index_type { UNDEFINED_INDEX = 0, @@ -51,22 +61,105 @@ typedef enum ndb_index_type { ORDERED_INDEX = 5 } NDB_INDEX_TYPE; +typedef enum ndb_index_status { + UNDEFINED = 0, + ACTIVE = 1, + TO_BE_DROPPED = 2 +} NDB_INDEX_STATUS; + typedef struct ndb_index_data { NDB_INDEX_TYPE type; - void *index; - void *unique_index; + NDB_INDEX_STATUS status; + const NdbDictionary::Index *index; + const NdbDictionary::Index *unique_index; unsigned char *unique_index_attrid_map; + // In this version stats are not shared between threads + NdbIndexStat* index_stat; + uint index_stat_cache_entries; + // Simple counter mechanism to decide when to connect to db + uint index_stat_update_freq; + uint index_stat_query_count; } NDB_INDEX_DATA; +typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; + +int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, + byte*& buffer, uint& buffer_size, + my_ptrdiff_t ptrdiff); + +typedef enum { + NSS_INITIAL= 0, + NSS_DROPPED, + NSS_ALTERED +} NDB_SHARE_STATE; + typedef struct st_ndbcluster_share { + NDB_SHARE_STATE state; + MEM_ROOT mem_root; THR_LOCK lock; pthread_mutex_t mutex; - char *table_name; - uint table_name_length,use_count; + char *key; + uint key_length; + THD *util_lock; + uint use_count; uint commit_count_lock; ulonglong commit_count; + char *db; + char *table_name; + Ndb::TupleIdRange tuple_id_range; +#ifdef HAVE_NDB_BINLOG + uint32 flags; + NdbEventOperation *op; + NdbEventOperation *op_old; // for rename table + char *old_names; // for rename table + TABLE_SHARE *table_share; + TABLE *table; + byte *record[2]; // pointer to allocated records for receiving data + NdbValue *ndb_value[2]; + MY_BITMAP *subscriber_bitmap; +#endif } NDB_SHARE; +inline +NDB_SHARE_STATE +get_ndb_share_state(NDB_SHARE *share) +{ + NDB_SHARE_STATE state; + pthread_mutex_lock(&share->mutex); + state= share->state; + pthread_mutex_unlock(&share->mutex); + return state; +} + +inline +void +set_ndb_share_state(NDB_SHARE *share, NDB_SHARE_STATE state) +{ + pthread_mutex_lock(&share->mutex); + share->state= state; + pthread_mutex_unlock(&share->mutex); +} + +struct Ndb_tuple_id_range_guard { + Ndb_tuple_id_range_guard(NDB_SHARE* _share) : + share(_share), + range(share->tuple_id_range) { + pthread_mutex_lock(&share->mutex); + } + ~Ndb_tuple_id_range_guard() { + pthread_mutex_unlock(&share->mutex); + } + NDB_SHARE* share; + Ndb::TupleIdRange& range; +}; + +#ifdef HAVE_NDB_BINLOG +/* NDB_SHARE.flags */ +#define NSF_HIDDEN_PK 1 /* table has hidden primary key */ +#define NSF_BLOB_FLAG 2 /* table has blob attributes */ +#define NSF_NO_BINLOG 4 /* table should not be binlogged */ +#endif + typedef enum ndb_item_type { NDB_VALUE = 0, // Qualified more with Item::Type NDB_FIELD = 1, // Qualified from table definition @@ -115,6 +208,7 @@ struct negated_function_mapping NDB_FUNC_TYPE neg_fun; }; + /* Define what functions can be negated in condition pushdown. Note, these HAVE to be in the same order as in definition enum @@ -256,7 +350,12 @@ class Ndb_item { const Item *item= value.item; if (item && field) - ((Item *)item)->save_in_field(field, false); + { + my_bitmap_map *old_map= + dbug_tmp_use_all_columns(field->table, field->table->write_set); + ((Item *)item)->save_in_field(field, FALSE); + dbug_tmp_restore_column_map(field->table->write_set, old_map); + } }; static NDB_FUNC_TYPE item_func_to_ndb_func(Item_func::Functype fun) @@ -478,6 +577,7 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; + typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 @@ -487,34 +587,57 @@ typedef enum ndb_query_state_bits { Place holder for ha_ndbcluster thread specific data */ +enum THD_NDB_OPTIONS +{ + TNO_NO_LOG_SCHEMA_OP= 1 << 0 +}; + +struct Ndb_local_table_statistics { + int no_uncommitted_rows_count; + ulong last_count; + ha_rows records; +}; + +typedef struct st_thd_ndb_share { + const void *key; + struct Ndb_local_table_statistics stat; +} THD_NDB_SHARE; + class Thd_ndb { public: Thd_ndb(); ~Thd_ndb(); + + void init_open_tables(); + THD_NDB_SHARE *get_open_table(THD *thd, const void *key); + Ndb *ndb; ulong count; uint lock_count; NdbTransaction *all; NdbTransaction *stmt; int error; + uint32 options; List<NDB_SHARE> changed_tables; uint query_state; + HASH open_tables; }; class ha_ndbcluster: public handler { public: - ha_ndbcluster(TABLE *table); + ha_ndbcluster(handlerton *hton, TABLE_SHARE *table); ~ha_ndbcluster(); + int ha_initialise(); int open(const char *name, int mode, uint test_if_locked); int close(void); int write_row(byte *buf); int update_row(const byte *old_data, byte *new_data); int delete_row(const byte *buf); - int index_init(uint index); + int index_init(uint index, bool sorted); int index_end(); int index_read(byte *buf, const byte *key, uint key_len, enum ha_rkey_function find_flag); @@ -538,6 +661,7 @@ class ha_ndbcluster: public handler bool eq_range, bool sorted, byte* buf); int read_range_next(); + int alter_tablespace(st_alter_tablespace *info); /** * Multi range stuff @@ -548,15 +672,24 @@ class ha_ndbcluster: public handler int read_multi_range_next(KEY_MULTI_RANGE **found_range_p); bool get_error_message(int error, String *buf); + ha_rows records(); int info(uint); + void get_dynamic_partition_info(PARTITION_INFO *stat_info, uint part_id); int extra(enum ha_extra_function operation); int extra_opt(enum ha_extra_function operation, ulong cache_size); + int reset(); int external_lock(THD *thd, int lock_type); void unlock_row(); int start_stmt(THD *thd, thr_lock_type lock_type); + void print_error(int error, myf errflag); const char * table_type() const; const char ** bas_ext() const; - ulong table_flags(void) const; + ulonglong table_flags(void) const; + void prepare_for_alter(); + int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys); + int prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys); + int final_drop_index(TABLE *table_arg); + void set_part_info(partition_info *part_info); ulong index_flags(uint idx, uint part, bool all_parts) const; uint max_supported_record_length() const; uint max_supported_keys() const; @@ -567,12 +700,25 @@ class ha_ndbcluster: public handler int rename_table(const char *from, const char *to); int delete_table(const char *name); int create(const char *name, TABLE *form, HA_CREATE_INFO *info); + int create_handler_files(const char *file, const char *old_name, + int action_flag, HA_CREATE_INFO *info); + int get_default_no_partitions(HA_CREATE_INFO *info); + bool get_no_parts(const char *name, uint *no_parts); + void set_auto_partitions(partition_info *part_info); + virtual bool is_fatal_error(int error, uint flags) + { + if (!handler::is_fatal_error(error, flags) || + error == HA_ERR_NO_PARTITION_FOUND) + return FALSE; + return TRUE; + } + THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type); bool low_byte_first() const; - bool has_transactions(); + const char* index_type(uint key_number); double scan_time(); @@ -607,7 +753,7 @@ static void set_tabname(const char *pathname, char *tabname); AND ... AND pushed_condN) or less restrictive condition, depending on handler's capabilities. - handler->extra(HA_EXTRA_RESET) call empties the condition stack. + handler->reset() call empties the condition stack. Calls to rnd_init/rnd_end, index_init/index_end etc do not affect the condition stack. The current implementation supports arbitrary AND/OR nested conditions @@ -636,23 +782,55 @@ static void set_tabname(const char *pathname, char *tabname); uint key_length, qc_engine_callback *engine_callback, ulonglong *engine_data); + + bool check_if_incompatible_data(HA_CREATE_INFO *info, + uint table_changes); + private: - int alter_table_name(const char *to); - int drop_table(); - int create_index(const char *name, KEY *key_info, bool unique); + friend int ndbcluster_drop_database_impl(const char *path); + friend int ndb_handle_schema_change(THD *thd, + Ndb *ndb, NdbEventOperation *pOp, + NDB_SHARE *share); + + static int delete_table(ha_ndbcluster *h, Ndb *ndb, + const char *path, + const char *db, + const char *table_name); + int create_ndb_index(const char *name, KEY *key_info, bool unique); int create_ordered_index(const char *name, KEY *key_info); int create_unique_index(const char *name, KEY *key_info); - int initialize_autoincrement(const void *table); - enum ILBP {ILBP_CREATE = 0, ILBP_OPEN = 1}; // Index List Build Phase - int build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase); + int create_index(const char *name, KEY *key_info, + NDB_INDEX_TYPE idx_type, uint idx_no); +// Index list management + int create_indexes(Ndb *ndb, TABLE *tab); + int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error); + void renumber_indexes(Ndb *ndb, TABLE *tab); + int drop_indexes(Ndb *ndb, TABLE *tab); + int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict, + KEY *key_info, const char *index_name, uint index_no); int get_metadata(const char* path); - void release_metadata(); + void release_metadata(THD *thd, Ndb *ndb); NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; - int check_index_fields_not_null(uint index_no); + NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info, + bool primary) const; + int check_index_fields_not_null(KEY *key_info); + + uint set_up_partition_info(partition_info *part_info, + TABLE *table, + void *tab); + char* get_tablespace_name(THD *thd); + int set_range_data(void *tab, partition_info* part_info); + int set_list_data(void *tab, partition_info* part_info); + int complemented_read(const byte *old_data, byte *new_data, + uint32 old_part_id); + int pk_read(const byte *key, uint key_len, byte *buf, uint32 part_id); + int ordered_index_scan(const key_range *start_key, + const key_range *end_key, + bool sorted, bool descending, byte* buf, + part_id_range *part_spec); + int full_table_scan(byte * buf); - int pk_read(const byte *key, uint key_len, byte *buf); - int complemented_pk_read(const byte *old_data, byte *new_data); bool check_all_operations_for_error(NdbTransaction *trans, const NdbOperation *first, const NdbOperation *last, @@ -660,10 +838,6 @@ private: int peek_indexed_rows(const byte *record); int unique_index_read(const byte *key, uint key_len, byte *buf); - int ordered_index_scan(const key_range *start_key, - const key_range *end_key, - bool sorted, bool descending, byte* buf); - int full_table_scan(byte * buf); int fetch_next(NdbScanOperation* op); int next_result(byte *buf); int define_read_attrs(byte* buf, NdbOperation* op); @@ -681,27 +855,30 @@ private: uint fieldnr, const byte* field_ptr); int set_ndb_key(NdbOperation*, Field *field, uint fieldnr, const byte* field_ptr); - int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, bool *set_blob_value= 0); + int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, + int row_offset= 0, bool *set_blob_value= 0); int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*); + int get_ndb_partition_id(NdbOperation *); friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg); - int get_ndb_blobs_value(NdbBlob *last_ndb_blob, my_ptrdiff_t ptrdiff); int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key_from_record(NdbOperation *op, const byte *record); int set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno); - int set_bounds(NdbIndexScanOperation*, const key_range *keys[2], uint= 0); + int set_bounds(NdbIndexScanOperation*, uint inx, bool rir, + const key_range *keys[2], uint= 0); int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr); void print_results(); - ulonglong get_auto_increment(); - void invalidate_dictionary_cache(bool global); - -bool uses_blob_value(bool all_fields); + virtual void get_auto_increment(ulonglong offset, ulonglong increment, + ulonglong nb_desired_values, + ulonglong *first_value, + ulonglong *nb_reserved_values); + bool uses_blob_value(); char *update_table_comment(const char * comment); - int write_ndb_file(); + int write_ndb_file(const char *name); int check_ndb_connection(THD* thd= current_thd); @@ -709,9 +886,10 @@ bool uses_blob_value(bool all_fields); int records_update(); void no_uncommitted_rows_execute_failure(); void no_uncommitted_rows_update(int); - void no_uncommitted_rows_init(THD *); void no_uncommitted_rows_reset(THD *); + void release_completed_operations(NdbTransaction*, bool); + /* Condition pushdown */ @@ -727,33 +905,39 @@ bool uses_blob_value(bool all_fields); NdbScanOperation* op); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); + friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; - void *m_table; - int m_table_version; - void *m_table_info; + const NdbDictionary::Table *m_table; + struct Ndb_local_table_statistics *m_table_info; char m_dbname[FN_HEADLEN]; //char m_schemaname[FN_HEADLEN]; char m_tabname[FN_HEADLEN]; - ulong m_table_flags; + ulonglong m_table_flags; THR_LOCK_DATA m_lock; bool m_lock_tuple; NDB_SHARE *m_share; NDB_INDEX_DATA m_index[MAX_KEY]; + THD_NDB_SHARE *m_thd_ndb_share; // NdbRecAttr has no reference to blob - typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; byte m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH]; + partition_info *m_part_info; + uint32 m_part_id; + byte *m_rec0; + Field **m_part_field_array; + bool m_use_partition_function; + bool m_sorted; bool m_use_write; bool m_ignore_dup_key; bool m_has_unique_index; bool m_primary_key_update; - bool m_retrieve_all_fields; - bool m_retrieve_primary_key; - ha_rows m_rows_to_insert; + bool m_write_op; + bool m_ignore_no_key; + ha_rows m_rows_to_insert; // TODO: merge it with handler::estimation_rows_to_insert? ha_rows m_rows_inserted; ha_rows m_bulk_insert_rows; ha_rows m_rows_changed; @@ -771,7 +955,6 @@ bool uses_blob_value(bool all_fields); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; - void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; @@ -785,10 +968,7 @@ bool uses_blob_value(bool all_fields); Ndb *get_ndb(); }; -extern struct show_var_st ndb_status_variables[]; - -bool ndbcluster_init(void); -bool ndbcluster_end(void); +extern SHOW_VAR ndb_status_variables[]; int ndbcluster_discover(THD* thd, const char* dbname, const char* name, const void** frmblob, uint* frmlen); @@ -796,8 +976,8 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, const char *wild, bool dir, List<char> *files); int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name); -int ndbcluster_drop_database(const char* path); - void ndbcluster_print_error(int error, const NdbOperation *error_op); -int ndbcluster_show_status(THD*); +static const char ndbcluster_hton_name[]= "ndbcluster"; +static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1; + |