/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany * * PrimeBase Media Stream for MySQL * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Paul McCullagh * * 2007-07-18 * * H&G2JCtL * * System tables. * */ #include "xt_config.h" #include #include #ifdef DRIZZLED #include #include #endif #include "ha_pbxt.h" #include "systab_xt.h" #include "discover_xt.h" #include "table_xt.h" #include "strutil_xt.h" #include "database_xt.h" #include "trace_xt.h" #if MYSQL_VERSION_ID >= 50120 #define byte uchar #endif /* * ------------------------------------------------------------------------- * SYSTEM TABLE DEFINITIONS */ //-------------------------------- static DT_FIELD_INFO xt_location_info[] = { { "Path", 128, NULL, MYSQL_TYPE_VARCHAR, (CHARSET_INFO *) system_charset_info, 0, "The location of PBXT tables"}, { "Table_count", 0, NULL, MYSQL_TYPE_LONGLONG, NULL, NOT_NULL_FLAG, "The number of PBXT table in this location"}, { NULL, 0, NULL, MYSQL_TYPE_STRING, NULL, 0, NULL} }; static DT_FIELD_INFO xt_statistics_info[] = { { "ID", 0, NULL, MYSQL_TYPE_LONG, NULL, NOT_NULL_FLAG, "The ID of the statistic"}, { "Name", 40, NULL, MYSQL_TYPE_VARCHAR, (CHARSET_INFO *) system_charset_info, 0, "The name of the statistic"}, { "Value", 0, NULL, MYSQL_TYPE_LONGLONG, NULL, NOT_NULL_FLAG, "The accumulated value"}, { NULL, 0, NULL, MYSQL_TYPE_STRING, NULL, 0, NULL} }; /* static DT_FIELD_INFO xt_reference_info[] = { {"Table_name", 128, NULL, MYSQL_TYPE_STRING, system_charset_info, NOT_NULL_FLAG, "The name of the referencing table"}, {"Blob_id", NULL, NULL, MYSQL_TYPE_LONGLONG, NULL, NOT_NULL_FLAG, "The BLOB reference number - part of the BLOB URL"}, {"Column_name", 50, NULL, MYSQL_TYPE_STRING, system_charset_info, NOT_NULL_FLAG, "The column name of the referencing field"}, {"Row_condition", 50, NULL, MYSQL_TYPE_VARCHAR, system_charset_info, 0, "This condition identifies the row in the table"}, {"Blob_url", 50, NULL, MYSQL_TYPE_VARCHAR, system_charset_info, NOT_NULL_FLAG, "The BLOB URL for HTTP GET access"}, {"Repository_id", NULL, NULL, MYSQL_TYPE_LONG, NULL, NOT_NULL_FLAG, "The repository file number of the BLOB"}, {"Repo_blob_offset",NULL, NULL, MYSQL_TYPE_LONGLONG, NULL, NOT_NULL_FLAG, "The offset in the repository file"}, {"Blob_size", NULL, NULL, MYSQL_TYPE_LONGLONG, NULL, NOT_NULL_FLAG, "The size of the BLOB in bytes"}, {"Deletion_time", NULL, NULL, MYSQL_TYPE_TIMESTAMP, NULL, 0, "The time the BLOB was deleted"}, {"Remove_in", NULL, NULL, MYSQL_TYPE_LONG, NULL, 0, "The number of seconds before the reference/BLOB is removed perminently"}, {"Temp_log_id", NULL, NULL, MYSQL_TYPE_LONG, NULL, 0, "Temporary log number of the referencing deletion entry"}, {"Temp_log_offset", NULL, NULL, MYSQL_TYPE_LONGLONG, NULL, 0, "Temporary log offset of the referencing deletion entry"}, {NULL, NULL, NULL, MYSQL_TYPE_STRING, NULL, 0, NULL} }; */ #define XT_SYSTAB_INVALID 0 #define XT_SYSTAB_LOCATION_ID 1 #define XT_SYSTAB_STATISTICS_ID 2 static THR_LOCK sys_location_lock; static THR_LOCK sys_statistics_lock; static xtBool sys_lock_inited = FALSE; static XTSystemTableShareRec xt_internal_tables[] = { { XT_SYSTAB_LOCATION_ID, "pbxt.location", &sys_location_lock, xt_location_info, NULL, FALSE}, { XT_SYSTAB_STATISTICS_ID, "pbxt.statistics", &sys_statistics_lock, xt_statistics_info, NULL, FALSE}, { XT_SYSTAB_INVALID, NULL, NULL, NULL, NULL, FALSE} }; /* static int pbms_discover_handler(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen) { int err = 1, i = 0; MY_STAT stat_info; // Check that the database exists! if ((!db) || ! my_stat(db,&stat_info,MYF(0))) return err; while (pbms_internal_tables[i].name) { if (!strcasecmp(name, pbms_internal_tables[i].name)) { err = ms_create_table_frm(hton, thd, db, name, pbms_internal_tables[i].info, pbms_internal_tables[i].keys, frmblob, frmlen); break; } i++; } return err; } */ /* * ------------------------------------------------------------------------- * MYSQL UTILITIES */ void xt_my_set_notnull_in_record(Field *field, char *record) { if (field->null_ptr) record[(uint) (field->null_ptr - (uchar *) field->table->record[0])] &= (uchar) ~field->null_bit; } /* * ------------------------------------------------------------------------- * OPEN SYSTEM TABLES */ XTOpenSystemTable::XTOpenSystemTable(XTThreadPtr self, XTDatabaseHPtr db, XTSystemTableShare *share, TABLE *table): XTObject() { ost_share = share; ost_my_table = table; ost_db = db; xt_heap_reference(self, db); } XTOpenSystemTable::~XTOpenSystemTable() { XTSystemTableShare::releaseSystemTable(this); } /* * ------------------------------------------------------------------------- * LOCATION TABLE */ XTLocationTable::XTLocationTable(XTThreadPtr self, XTDatabaseHPtr db, XTSystemTableShare *share, TABLE *table): XTOpenSystemTable(self, db, share, table) { } XTLocationTable::~XTLocationTable() { unuse(); } bool XTLocationTable::use() { return true; } bool XTLocationTable::unuse() { return true; } bool XTLocationTable::seqScanInit() { lt_index = 0; return true; } bool XTLocationTable::seqScanNext(char *buf, bool *eof) { bool ok = true; *eof = false; xt_ht_lock(NULL, ost_db->db_tables); if (lt_index >= xt_sl_get_size(ost_db->db_table_paths)) { ok = false; *eof = true; goto done; } loadRow(buf, lt_index); lt_index++; done: xt_ht_unlock(NULL, ost_db->db_tables); return ok; #ifdef xxx csWord4 last_access; csWord4 last_ref; csWord4 creation_time; csWord4 access_code; csWord2 cont_type; size_t ref_size; csWord2 head_size; csWord8 blob_size; uint32 len; Field *curr_field; byte *save; MY_BITMAP *save_write_set; last_access = CS_GET_DISK_4(blob->rb_last_access_4); last_ref = CS_GET_DISK_4(blob->rb_last_ref_4); creation_time = CS_GET_DISK_4(blob->rb_create_time_4); cont_type = CS_GET_DISK_2(blob->rb_cont_type_2); ref_size = CS_GET_DISK_1(blob->rb_ref_size_1); head_size = CS_GET_DISK_2(blob->rb_head_size_2); blob_size = CS_GET_DISK_6(blob->rb_blob_size_6); access_code = CS_GET_DISK_4(blob->rb_auth_code_4); /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when * I use store()!?? * But I want to use it! :( */ save_write_set = table->write_set; table->write_set = NULL; memset(buf, 0xFF, table->s->null_bytes); for (Field **field=table->field ; *field ; field++) { curr_field = *field; save = curr_field->ptr; #if MYSQL_VERSION_ID < 50114 curr_field->ptr = (byte *) buf + curr_field->offset(); #else curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]); #endif switch (curr_field->field_name[0]) { case 'A': ASSERT(strcmp(curr_field->field_name, "Access_code") == 0); curr_field->store(access_code, true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'R': switch (curr_field->field_name[6]) { case 't': // Repository_id INT ASSERT(strcmp(curr_field->field_name, "Repository_id") == 0); curr_field->store(iRepoFile->myRepo->getRepoID(), true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'l': // Repo_blob_offset BIGINT ASSERT(strcmp(curr_field->field_name, "Repo_blob_offset") == 0); curr_field->store(iRepoOffset, true); xt_my_set_notnull_in_record(curr_field, buf); break; } break; case 'B': switch (curr_field->field_name[5]) { case 's': // Blob_size BIGINT ASSERT(strcmp(curr_field->field_name, "Blob_size") == 0); curr_field->store(blob_size, true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'd': // Blob_data LONGBLOB ASSERT(strcmp(curr_field->field_name, "Blob_data") == 0); if (blob_size <= 0xFFFFFFF) { iBlobBuffer->setLength((u_int) blob_size); len = iRepoFile->read(iBlobBuffer->getBuffer(0), iRepoOffset + head_size, (size_t) blob_size, 0); ((Field_blob *) curr_field)->set_ptr(len, (byte *) iBlobBuffer->getBuffer(0)); xt_my_set_notnull_in_record(curr_field, buf); } break; } break; case 'H': // Head_size SMALLINT UNSIGNED ASSERT(strcmp(curr_field->field_name, "Head_size") == 0); curr_field->store(head_size, true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'C': switch (curr_field->field_name[1]) { case 'r': // Creation_time TIMESTAMP ASSERT(strcmp(curr_field->field_name, "Creation_time") == 0); curr_field->store(ms_my_1970_to_mysql_time(creation_time), true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'o': // Content_type CHAR(128) ASSERT(strcmp(curr_field->field_name, "Content_type") == 0); CSString *cont_type_str = ost_share->mySysDatabase->getContentType(cont_type); if (cont_type_str) { curr_field->store(cont_type_str->getCString(), cont_type_str->length(), &my_charset_utf8_general_ci); cont_type_str->release(); xt_my_set_notnull_in_record(curr_field, buf); } break; } break; case 'L': switch (curr_field->field_name[5]) { case 'r': // Last_ref_time TIMESTAMP ASSERT(strcmp(curr_field->field_name, "Last_ref_time") == 0); curr_field->store(ms_my_1970_to_mysql_time(last_ref), true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'a': // Last_access_time TIMESTAMP ASSERT(strcmp(curr_field->field_name, "Last_access_time") == 0); curr_field->store(ms_my_1970_to_mysql_time(last_access), true); xt_my_set_notnull_in_record(curr_field, buf); break; } break; } curr_field->ptr = save; } table->write_set = save_write_set; return true; #endif return false; } void XTLocationTable::loadRow(char *buf, xtWord4 row_id) { TABLE *table = ost_my_table; Field *curr_field; XTTablePathPtr tp_ptr; byte *save; MY_BITMAP *save_write_set; /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when * I use store()!?? * But I want to use it! :( */ save_write_set = table->write_set; table->write_set = NULL; memset(buf, 0xFF, table->s->null_bytes); tp_ptr = *((XTTablePathPtr *) xt_sl_item_at(ost_db->db_table_paths, row_id)); for (Field **field=table->field ; *field ; field++) { curr_field = *field; save = curr_field->ptr; #if MYSQL_VERSION_ID < 50114 curr_field->ptr = (byte *) buf + curr_field->offset(); #else curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]); #endif switch (curr_field->field_name[0]) { case 'P': // Path VARCHAR(128) ASSERT_NS(strcmp(curr_field->field_name, "Path") == 0); curr_field->store(tp_ptr->tp_path, strlen(tp_ptr->tp_path), &my_charset_utf8_general_ci); xt_my_set_notnull_in_record(curr_field, buf); break; case 'T': // Table_count INT ASSERT_NS(strcmp(curr_field->field_name, "Table_count") == 0); curr_field->store(tp_ptr->tp_tab_count, true); xt_my_set_notnull_in_record(curr_field, buf); break; } curr_field->ptr = save; } table->write_set = save_write_set; } xtWord4 XTLocationTable::seqScanPos(xtWord1 *buf __attribute__((unused))) { return lt_index-1; } bool XTLocationTable::seqScanRead(xtWord4 rec_id, char *buf) { loadRow(buf, rec_id); return true; } /* * ------------------------------------------------------------------------- * STATISTICS TABLE */ XTStatisticsTable::XTStatisticsTable(XTThreadPtr self, XTDatabaseHPtr db, XTSystemTableShare *share, TABLE *table): XTOpenSystemTable(self, db, share, table) { } XTStatisticsTable::~XTStatisticsTable() { unuse(); } bool XTStatisticsTable::use() { return true; } bool XTStatisticsTable::unuse() { return true; } bool XTStatisticsTable::seqScanInit() { tt_index = 0; xt_gather_statistics(&tt_statistics); return true; } bool XTStatisticsTable::seqScanNext(char *buf, bool *eof) { bool ok = true; *eof = false; if (tt_index >= XT_STAT_CURRENT_MAX) { ok = false; *eof = true; goto done; } loadRow(buf, tt_index); tt_index++; done: return ok; } void XTStatisticsTable::loadRow(char *buf, xtWord4 rec_id) { TABLE *table = ost_my_table; MY_BITMAP *save_write_set; Field *curr_field; byte *save; const char *stat_name; u_llong stat_value; /* ASSERT_COLUMN_MARKED_FOR_WRITE is failing when * I use store()!?? * But I want to use it! :( */ save_write_set = table->write_set; table->write_set = NULL; memset(buf, 0xFF, table->s->null_bytes); stat_name = xt_get_stat_meta_data(rec_id)->sm_name; stat_value = xt_get_statistic(&tt_statistics, ost_db, rec_id); for (Field **field=table->field ; *field ; field++) { curr_field = *field; save = curr_field->ptr; #if MYSQL_VERSION_ID < 50114 curr_field->ptr = (byte *) buf + curr_field->offset(); #else curr_field->ptr = (byte *) buf + curr_field->offset(curr_field->table->record[0]); #endif switch (curr_field->field_name[0]) { case 'I': // Value BIGINT ASSERT_NS(strcmp(curr_field->field_name, "ID") == 0); curr_field->store(rec_id+1, true); xt_my_set_notnull_in_record(curr_field, buf); break; case 'N': // Name VARCHAR(40) ASSERT_NS(strcmp(curr_field->field_name, "Name") == 0); curr_field->store(stat_name, strlen(stat_name), &my_charset_utf8_general_ci); xt_my_set_notnull_in_record(curr_field, buf); break; case 'V': // Value BIGINT ASSERT_NS(strcmp(curr_field->field_name, "Value") == 0); curr_field->store(stat_value, true); xt_my_set_notnull_in_record(curr_field, buf); break; } curr_field->ptr = save; } table->write_set = save_write_set; } xtWord4 XTStatisticsTable::seqScanPos(xtWord1 *buf __attribute__((unused))) { return tt_index-1; } bool XTStatisticsTable::seqScanRead(xtWord4 rec_id, char *buf) { loadRow(buf, rec_id); return true; } /* * ------------------------------------------------------------------------- * SYSTEM TABLE SHARES */ void st_path_to_table_name(size_t size, char *buffer, const char *path) { char *str; xt_strcpy(size, buffer, xt_last_2_names_of_path(path)); xt_remove_extension(buffer); if ((str = strchr(buffer, '\\'))) *str = '.'; if ((str = strchr(buffer, '/'))) *str = '.'; } void XTSystemTableShare::startUp(XTThreadPtr self __attribute__((unused))) { thr_lock_init(&sys_location_lock); thr_lock_init(&sys_statistics_lock); sys_lock_inited = TRUE; } void XTSystemTableShare::shutDown(XTThreadPtr self __attribute__((unused))) { if (sys_lock_inited) { thr_lock_delete(&sys_location_lock); thr_lock_delete(&sys_statistics_lock); sys_lock_inited = FALSE; } } bool XTSystemTableShare::isSystemTable(const char *table_path) { int i = 0; char tab_name[100]; st_path_to_table_name(100, tab_name, table_path); while (xt_internal_tables[i].sts_path) { if (strcasecmp(tab_name, xt_internal_tables[i].sts_path) == 0) return true; i++; } return false; } void XTSystemTableShare::setSystemTableDeleted(const char *table_path) { int i = 0; char tab_name[100]; st_path_to_table_name(100, tab_name, table_path); while (xt_internal_tables[i].sts_path) { if (strcasecmp(tab_name, xt_internal_tables[i].sts_path) == 0) { xt_internal_tables[i].sts_exists = FALSE; break; } i++; } } bool XTSystemTableShare::doesSystemTableExist() { int i = 0; while (xt_internal_tables[i].sts_path) { if (xt_internal_tables[i].sts_exists) return true; i++; } return false; } void XTSystemTableShare::createSystemTables(XTThreadPtr self __attribute__((unused)), XTDatabaseHPtr db __attribute__((unused))) { int i = 0; while (xt_internal_tables[i].sts_path) { if (!xt_create_table_frm(pbxt_hton, current_thd, "pbxt", strchr(xt_internal_tables[i].sts_path, '.') + 1, xt_internal_tables[i].sts_info, xt_internal_tables[i].sts_keys, TRUE /*do not recreate*/)) xt_internal_tables[i].sts_exists = TRUE; i++; } } XTOpenSystemTable *XTSystemTableShare::openSystemTable(XTThreadPtr self, const char *table_path, TABLE *table) { XTSystemTableShare *share; XTOpenSystemTable *otab = NULL; int i = 0; char tab_name[100]; st_path_to_table_name(100, tab_name, table_path); while (xt_internal_tables[i].sts_path) { if (strcasecmp(tab_name, xt_internal_tables[i].sts_path) == 0) { share = &xt_internal_tables[i]; goto found; } i++; } return NULL; found: share->sts_exists = TRUE; switch (share->sts_id) { case XT_SYSTAB_LOCATION_ID: if (!(otab = new XTLocationTable(self, self->st_database, share, table))) xt_throw_errno(XT_CONTEXT, XT_ENOMEM); break; case XT_SYSTAB_STATISTICS_ID: if (!(otab = new XTStatisticsTable(self, self->st_database, share, table))) xt_throw_errno(XT_CONTEXT, XT_ENOMEM); break; default: xt_throw_taberr(XT_CONTEXT, XT_ERR_TABLE_NOT_FOUND, (XTPathStrPtr) table_path); break; } return otab; } void XTSystemTableShare::releaseSystemTable(XTOpenSystemTable *tab) { if (tab->ost_db) { XTThreadPtr self = xt_get_self(); try_(a) { xt_heap_release(self, tab->ost_db); } catch_(a) { } cont_(a); tab->ost_db = NULL; } }