summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster.cc')
-rw-r--r--sql/ha_ndbcluster.cc114
1 files changed, 95 insertions, 19 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 730aab8a928..2460c2fb81a 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -123,8 +123,6 @@ static const err_code_mapping err_map[]=
{ 827, HA_ERR_RECORD_FILE_FULL },
{ 832, HA_ERR_RECORD_FILE_FULL },
- { 0, 1 },
-
{ -1, -1 }
};
@@ -3657,14 +3655,6 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
DBUG_RETURN(0);
}
-
-int ndbcluster_can_discover(THD *thd, const char *name)
-{
- DBUG_ENTER("ndbcluster_can_discover");
- DBUG_RETURN(!my_strcasecmp(system_charset_info, fn_ext(name), ha_ndb_ext))
-}
-
-
/*
Check if a table exists in NDB
@@ -3699,21 +3689,39 @@ int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
}
-/*
- List tables in NDB Cluster
-*/
-int ndbcluster_list_tables(THD* thd, HASH *tables, const char* db)
+extern "C" byte* ndb_tables_get_key(const char *entry, uint *length,
+ my_bool not_used __attribute__((unused)))
+{
+ *length= strlen(entry);
+ return (byte*) entry;
+}
+
+
+int ndbcluster_find_files(THD *thd,const char *db,const char *path,
+ const char *wild, bool dir)
{
uint i;
NdbDictionary::Dictionary::List list;
Ndb* ndb;
+ char name[FN_REFLEN];
+ HASH ndb_tables;
DBUG_ENTER("ndbcluster_list_tables");
DBUG_PRINT("enter", ("db: %s", db));
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ if (dir)
+ DBUG_RETURN(0); // Discover of databases not yet discovered
+
+ if (hash_init(&ndb_tables, system_charset_info,32,0,0,
+ (hash_get_key)ndb_tables_get_key,0,0))
+ {
+ DBUG_PRINT("info", ("Failed to init HASH ndb_tables"));
+ DBUG_RETURN(-1);
+ }
+
/* List tables in NDB Cluster kernel */
NDBDICT *dict= ndb->getDictionary();
if (dict->listObjects(list,
@@ -3723,14 +3731,82 @@ int ndbcluster_list_tables(THD* thd, HASH *tables, const char* db)
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("discover", ("%d: %s/%s", t.id, t.database, t.name));
- if (strcmp(t.database, db) == 0)
+ DBUG_PRINT("discover", ("%d, %s/%s", t.id, t.database, t.name));
+ if (my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name)))
+ continue;
+
+ // Only discover files that fullfill wildcard
+ if (wild)
{
- DBUG_PRINT("info", ("my_hash_insert %s", t.name));
- (void)my_hash_insert(tables, (byte*)thd->strdup(t.name));;
+ if (lower_case_table_names)
+ {
+ if (wild_case_compare(files_charset_info, t.name, wild))
+ continue;
+ }
+ else if (wild_compare(t.name,wild,0))
+ continue;
+ }
+
+ // Discover the file if it does not already exists on disk
+ (void)strxnmov(name, FN_REFLEN,
+ mysql_data_home,"/",t.database,"/",t.name,reg_ext,NullS);
+ DBUG_PRINT("discover", ("Check access for %s", name));
+ if (access(name, F_OK))
+ {
+ DBUG_PRINT("discover", ("Table %s need disocver", name));
+ pthread_mutex_lock(&LOCK_open);
+ ha_create_table_from_engine(thd, t.database, t.name, true);
+ pthread_mutex_unlock(&LOCK_open);
}
}
- DBUG_RETURN(0);
+
+ /*
+ Find all .ndb files in current dir and check
+ if they still exists in NDB
+ */
+ char *ext;
+ MY_DIR *dirp;
+ FILEINFO *file;
+
+ if (!(dirp= my_dir(path,MYF(MY_WME | (dir ? MY_WANT_STAT : 0)))))
+ DBUG_RETURN(-1);
+
+ for (i= 0; i < (uint)dirp->number_off_files; i++)
+ {
+ file= dirp->dir_entry+i;
+ {
+ ext= fn_ext(file->name);
+ if(!my_strcasecmp(system_charset_info, ext, ha_ndb_ext))
+ {
+ DBUG_PRINT("discover", ("Found file: %s", file->name));
+ *ext= 0;
+
+ if (hash_search(&ndb_tables, file->name, strlen(file->name)))
+ continue;
+
+ DBUG_PRINT("discover", ("File didn't exist in ndb_tables list"));
+
+ // Verify that handler agrees table is gone.
+ if (ndbcluster_table_exists(thd, db, file->name) == 0)
+ {
+ DBUG_PRINT("discover", ("Remove table %s/%s",db, file->name ));
+ // Delete the table and all related files
+ TABLE_LIST table_list;
+ bzero((char*) &table_list,sizeof(table_list));
+ table_list.db= (char*) db;
+ table_list.real_name=(char*)file->name;
+ (void)mysql_rm_table_part2_with_lock(thd, &table_list,
+ /* if_exists */ true,
+ /* drop_temporary */ false,
+ /* dont_log_query*/ true);
+ }
+ }
+ }
+ }
+
+ hash_free(&ndb_tables);
+ my_dirend(dirp);
+ DBUG_RETURN(0);
}