diff options
Diffstat (limited to 'storage/pbxt/src/thread_xt.cc')
-rw-r--r-- | storage/pbxt/src/thread_xt.cc | 2349 |
1 files changed, 2349 insertions, 0 deletions
diff --git a/storage/pbxt/src/thread_xt.cc b/storage/pbxt/src/thread_xt.cc new file mode 100644 index 00000000000..52c2c6c29c5 --- /dev/null +++ b/storage/pbxt/src/thread_xt.cc @@ -0,0 +1,2349 @@ +/* Copyright (c) 2005 PrimeBase Technologies GmbH + * + * PrimeBase XT + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * 2005-01-03 Paul McCullagh + * + * H&G2JCtL + */ + +#include "xt_config.h" + +#ifdef DRIZZLED +#include <bitset> +#endif + +#ifndef XT_WIN +#include <unistd.h> +#include <sys/time.h> +#include <sys/resource.h> +#endif +#include <time.h> +#include <stdarg.h> +#include <signal.h> +#include <stdlib.h> +#include <ctype.h> +#include <errno.h> + +#include "xt_defs.h" +#include "strutil_xt.h" +#include "pthread_xt.h" +#include "thread_xt.h" +#include "memory_xt.h" +#include "sortedlist_xt.h" +#include "trace_xt.h" +#include "myxt_xt.h" +#include "database_xt.h" + +void xt_db_init_thread(XTThreadPtr self, XTThreadPtr new_thread); +void xt_db_exit_thread(XTThreadPtr self); + +static void thr_accumulate_statistics(XTThreadPtr self); + +/* + * ----------------------------------------------------------------------- + * THREAD GLOBALS + */ + +xtPublic u_int xt_thr_maximum_threads; +xtPublic u_int xt_thr_current_thread_count; +xtPublic u_int xt_thr_current_max_threads; + +/* This structure is a double linked list of thread, with a wait + * condition on it. + */ +static XTLinkedListPtr thr_list; + +/* This structure maps thread ID's to thread pointers. */ +xtPublic XTThreadPtr *xt_thr_array; +static xt_mutex_type thr_array_lock; + +/* Global accumulated statistics: */ +static XTStatisticsRec thr_statistics; + +#ifdef DEBUG +static void break_in_assertion(c_char *expr, c_char *func, c_char *file, u_int line) +{ + printf("%s(%s:%d) %s\n", func, file, (int) line, expr); +} +#endif + +/* + * ----------------------------------------------------------------------- + * Error logging + */ + +static xt_mutex_type log_mutex; +static int log_level = 0; +static FILE *log_file = NULL; +static xtBool log_newline = TRUE; + +xtPublic xtBool xt_init_logging(void) +{ + int err; + + log_file = stdout; + log_level = XT_LOG_TRACE; + err = xt_p_mutex_init_with_autoname(&log_mutex, NULL); + if (err) { + xt_log_errno(XT_NS_CONTEXT, err); + log_file = NULL; + log_level = 0; + return FALSE; + } + if (!xt_init_trace()) { + xt_exit_logging(); + return FALSE; + } + return TRUE; +} + +xtPublic void xt_exit_logging(void) +{ + if (log_file) { + xt_free_mutex(&log_mutex); + log_file = NULL; + } + xt_exit_trace(); +} + +xtPublic void xt_get_now(char *buffer, size_t len) +{ + time_t ticks; + struct tm ltime; + + ticks = time(NULL); + if (ticks == (time_t) -1) { +#ifdef XT_WIN + printf(buffer, "** error %d getting time **", errno); +#else + snprintf(buffer, len, "** error %d getting time **", errno); +#endif + return; + } + localtime_r(&ticks, <ime); + strftime(buffer, len, "%y%m%d %H:%M:%S", <ime); +} + +static void thr_log_newline(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level) +{ + c_char *level_str; + char time_str[200]; + char thr_name[XT_THR_NAME_SIZE+3]; + + xt_get_now(time_str, 200); + if (self && *self->t_name) { + xt_strcpy(XT_THR_NAME_SIZE+3, thr_name, " "); + xt_strcat(XT_THR_NAME_SIZE+3, thr_name, self->t_name); + } + else + thr_name[0] = 0; + switch (level) { + case XT_LOG_FATAL: level_str = " [Fatal]"; break; + case XT_LOG_ERROR: level_str = " [Error]"; break; + case XT_LOG_WARNING: level_str = " [Warning]"; break; + case XT_LOG_INFO: level_str = " [Note]"; break; + case XT_LOG_TRACE: level_str = " [Trace]"; break; + default: level_str = " "; break; + } + if (func && *func && *func != '-') { + char func_name[XT_MAX_FUNC_NAME_SIZE]; + + xt_strcpy_term(XT_MAX_FUNC_NAME_SIZE, func_name, func, '('); + if (file && *file) + fprintf(log_file, "%s%s%s %s(%s:%d) ", time_str, level_str, thr_name, func_name, xt_last_name_of_path(file), line); + else + fprintf(log_file, "%s%s%s %s() ", time_str, level_str, thr_name, func_name); + } + else { + if (file && *file) + fprintf(log_file, "%s%s%s [%s:%d] ", time_str, level_str, thr_name, xt_last_name_of_path(file), line); + else + fprintf(log_file, "%s%s%s ", time_str, level_str, thr_name); + } +} + +#ifdef XT_WIN +/* Windows uses printf()!! */ +#define DEFAULT_LOG_BUFFER_SIZE 2000 +#else +#ifdef DEBUG +#define DEFAULT_LOG_BUFFER_SIZE 10 +#else +#define DEFAULT_LOG_BUFFER_SIZE 2000 +#endif +#endif + +void xt_log_flush(XTThreadPtr XT_UNUSED(self)) +{ + fflush(log_file); +} + +/* + * Log the given formated string information to the log file. + * Before each new line, this function writes the + * log header, which includes the time, log level, + * and source file and line number (optional). + */ +static void thr_log_va(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, c_char *fmt, va_list ap) +{ + char buffer[DEFAULT_LOG_BUFFER_SIZE]; + char *log_string = NULL; + + if (level > log_level) + return; + + xt_lock_mutex_ns(&log_mutex); + +#ifdef XT_WIN + vsprintf(buffer, fmt, ap); + log_string = buffer; +#else +#if !defined(va_copy) || defined(XT_SOLARIS) + int len; + + len = vsnprintf(buffer, DEFAULT_LOG_BUFFER_SIZE-1, fmt, ap); + if (len > DEFAULT_LOG_BUFFER_SIZE-1) + len = DEFAULT_LOG_BUFFER_SIZE-1; + buffer[len] = 0; + log_string = buffer; +#else + /* Use the buffer, unless it is too small */ + va_list ap2; + int bufsize; + + va_copy(ap2, ap); + bufsize = vsnprintf(buffer, DEFAULT_LOG_BUFFER_SIZE, fmt, ap); + if (bufsize >= DEFAULT_LOG_BUFFER_SIZE) { + log_string = (char *) malloc(bufsize + 1); + if (vsnprintf(log_string, bufsize + 1, fmt, ap2) > bufsize) { + free(log_string); + log_string = NULL; + } + } + else + log_string = buffer; +#endif +#endif + + if (log_string) { + char *str, *str_end, tmp_ch; + + str = log_string; + while (*str) { + if (log_newline) { + thr_log_newline(self, func, file, line, level); + log_newline = FALSE; + } + str_end = strchr(str, '\n'); + if (str_end) { + str_end++; + tmp_ch = *str_end; + *str_end = 0; + log_newline = TRUE; + } + else { + str_end = str + strlen(str); + tmp_ch = 0; + } + fprintf(log_file, "%s", str); + fflush(log_file); + *str_end = tmp_ch; + str = str_end; + } + + if (log_string != buffer) + free(log_string); + } + + xt_unlock_mutex_ns(&log_mutex); +} + +xtPublic void xt_logf(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, c_char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + thr_log_va(self, func, file, line, level, fmt, ap); + va_end(ap); +} + +xtPublic void xt_log(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, c_char *string) +{ + xt_logf(self, func, file, line, level, "%s", string); +} + +static int thr_log_error_va(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, int xt_err, int sys_err, c_char *fmt, va_list ap) +{ + int default_level; + char xt_err_string[50]; + + *xt_err_string = 0; + switch (xt_err) { + case XT_ASSERTION_FAILURE: + strcpy(xt_err_string, "Assertion"); + default_level = XT_LOG_FATAL; + break; + case XT_SYSTEM_ERROR: + strcpy(xt_err_string, "errno"); + default_level = XT_LOG_ERROR; + break; + case XT_SIGNAL_CAUGHT: + strcpy(xt_err_string, "Signal"); + default_level = XT_LOG_ERROR; + break; + default: + sprintf(xt_err_string, "%d", xt_err); + default_level = XT_LOG_ERROR; + break; + } + if (level == XT_LOG_DEFAULT) + level = default_level; + + if (*xt_err_string) { + if (sys_err) + xt_logf(self, func, file, line, level, "%s (%d): ", xt_err_string, sys_err); + else + xt_logf(self, func, file, line, level, "%s: ", xt_err_string); + } + thr_log_va(self, func, file, line, level, fmt, ap); + xt_logf(self, func, file, line, level, "\n"); + return level; +} + +/* The function returns the actual log level used. */ +xtPublic int xt_log_errorf(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, int xt_err, int sys_err, c_char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + level = thr_log_error_va(self, func, file, line, level, xt_err, sys_err, fmt, ap); + va_end(ap); + return level; +} + +/* The function returns the actual log level used. */ +xtPublic int xt_log_error(XTThreadPtr self, c_char *func, c_char *file, u_int line, int level, int xt_err, int sys_err, c_char *string) +{ + return xt_log_errorf(self, func, file, line, level, xt_err, sys_err, "%s", string); +} + +xtPublic void xt_log_exception(XTThreadPtr self, XTExceptionPtr e, int level) +{ + level = xt_log_error( + self, + e->e_func_name, + e->e_source_file, + e->e_source_line, + level, + e->e_xt_err, + e->e_sys_err, + e->e_err_msg); + /* Dump the catch trace: */ + if (*e->e_catch_trace) + xt_logf(self, NULL, NULL, 0, level, "%s", e->e_catch_trace); +} + +xtPublic void xt_log_and_clear_exception(XTThreadPtr self) +{ + xt_log_exception(self, &self->t_exception, XT_LOG_DEFAULT); + xt_clear_exception(self); +} + +xtPublic void xt_log_and_clear_exception_ns(void) +{ + xt_log_and_clear_exception(xt_get_self()); +} + +xtPublic void xt_log_and_clear_warning(XTThreadPtr self) +{ + xt_log_exception(self, &self->t_exception, XT_LOG_WARNING); + xt_clear_exception(self); +} + +xtPublic void xt_log_and_clear_warning_ns(void) +{ + xt_log_and_clear_warning(xt_get_self()); +} + +/* + * ----------------------------------------------------------------------- + * Exceptions + */ + +static void thr_add_catch_trace(XTExceptionPtr e, c_char *func, c_char *file, u_int line) +{ + if (func && *func && *func != '-') { + xt_strcat_term(XT_CATCH_TRACE_SIZE, e->e_catch_trace, func, '('); + xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, "("); + } + if (file && *file) { + xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, xt_last_name_of_path(file)); + if (line) { + char buffer[40]; + + sprintf(buffer, "%u", line); + xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, ":"); + xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, buffer); + } + } + if (func && *func && *func != '-') + xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, ")"); + xt_strcat(XT_CATCH_TRACE_SIZE, e->e_catch_trace, "\n"); +} + +static void thr_save_error_va(XTExceptionPtr e, XTThreadPtr self, xtBool throw_it, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, va_list ap) +{ + int i; + + if (!e) + return; + + e->e_xt_err = xt_err; + e->e_sys_err = sys_err; + vsnprintf(e->e_err_msg, XT_ERR_MSG_SIZE, fmt, ap); + + /* Make the first character of the message upper case: */ + /* This did not work for foreign languages! */ + if (e->e_err_msg[0] >= 'a' && e->e_err_msg[0] <= 'z') + e->e_err_msg[0] = (char) toupper(e->e_err_msg[0]); + + if (func && *func && *func != '-') + xt_strcpy_term(XT_MAX_FUNC_NAME_SIZE, e->e_func_name, func, '('); + else + *e->e_func_name = 0; + if (file && *file) { + xt_strcpy(XT_SOURCE_FILE_NAME_SIZE, e->e_source_file, xt_last_name_of_path(file)); + e->e_source_line = line; + } + else { + *e->e_source_file = 0; + e->e_source_line = 0; + } + *e->e_catch_trace = 0; + + if (!self) + return; + + /* Create a stack trace for this exception: */ + thr_add_catch_trace(e, func, file, line); + for (i=self->t_call_top-1; i>=0; i--) + thr_add_catch_trace(e, self->t_call_stack[i].cs_func, self->t_call_stack[i].cs_file, self->t_call_stack[i].cs_line); + + if (throw_it) + xt_throw(self); +} + +/* + * ----------------------------------------------------------------------- + * THROWING EXCEPTIONS + */ + +/* If we have to allocate resources and the hold them temporarily during which + * time an exception could occur, then these functions provide a holding + * place for the data, which will be freed in the case of an exception. + * + * Note: the free functions could themselves allocated resources. + * to make sure all things work out we only remove the resource from + * then stack when it is freed. + */ +static void thr_free_resources(XTThreadPtr self, XTResourcePtr top) +{ + XTResourcePtr rp; + XTThreadFreeFunc free_func; + + if (!top) + return; + while (self->t_res_top > top) { + /* Pop the top resource: */ + rp = (XTResourcePtr) (((char *) self->t_res_top) - self->t_res_top->r_prev_size); + + /* Free the resource: */ + if (rp->r_free_func) { + free_func = rp->r_free_func; + rp->r_free_func = NULL; + free_func(self, rp->r_data); + } + + self->t_res_top = rp; + } +} + +xtPublic void xt_bug(XTThreadPtr XT_UNUSED(self)) +{ + static int *bug_ptr = NULL; + + bug_ptr = NULL; +} + +/* + * This function is called when an exception is caught. + * It restores the function call top and frees + * any resource allocated by lower levels. + */ +xtPublic void xt_caught(XTThreadPtr self) +{ + /* Restore the call top: */ + self->t_call_top = self->t_jmp_env[self->t_jmp_depth].jb_call_top; + + /* Free the temporary data that would otherwize be lost + * This should do nothing, because we actually free things on throw + * (se below). + */ + thr_free_resources(self, self->t_jmp_env[self->t_jmp_depth].jb_res_top); +} + +/* Throw an already registered error: */ +xtPublic void xt_throw(XTThreadPtr self) +{ + if (self) { + ASSERT_NS(self->t_exception.e_xt_err); + if (self->t_jmp_depth > 0 && self->t_jmp_depth <= XT_MAX_JMP) { + /* As recommended by Barry: rree the resources before the stack is invalid! */ + thr_free_resources(self, self->t_jmp_env[self->t_jmp_depth-1].jb_res_top); + + /* Then do the longjmp: */ + longjmp(self->t_jmp_env[self->t_jmp_depth-1].jb_buffer, 1); + } + } + + /* + * We cannot throw an error, because it will not be caught. + * This means there is no try ... catch block above. + * In this case, we just return. + * The calling functions must handle errors... + xt_caught(self); + xt_log(XT_CONTEXT, XT_LOG_FATAL, "Uncaught exception\n"); + xt_exit_thread(self, NULL); + */ +} + +xtPublic void xt_throwf(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...) +{ + va_list ap; + XTThreadPtr thread = self ? self : xt_get_self(); + + va_start(ap, fmt); + thr_save_error_va(thread ? &thread->t_exception : NULL, thread, self ? TRUE : FALSE, func, file, line, xt_err, sys_err, fmt, ap); + va_end(ap); +} + +xtPublic void xt_throw_error(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg) +{ + xt_throwf(self, func, file, line, xt_err, sys_err, "%s", msg); +} + +#define XT_SYS_ERR_SIZE 300 + +#ifdef XT_WIN +static c_char *thr_get_sys_error(int err, char *err_msg) +#else +static c_char *thr_get_sys_error(int err, char *XT_UNUSED(err_msg)) +#endif +{ +#ifdef XT_WIN + char *ptr; + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, + err, 0, err_msg, XT_SYS_ERR_SIZE, NULL)) { + return strerror(err); + } + + ptr = &err_msg[strlen(err_msg)]; + while (ptr-1 > err_msg) { + if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.') + break; + ptr--; + } + *ptr = 0; +return err_msg; +#else + return strerror(err); +#endif +} + +static c_char *thr_get_err_string(int xt_err) +{ + c_char *str; + + switch (xt_err) { + case XT_ERR_STACK_OVERFLOW: str = "Stack overflow"; break; + case XT_ERR_JUMP_OVERFLOW: str = "Jump overflow"; break; + case XT_ERR_TABLE_EXISTS: str = "Table `%s` already exists"; break; + case XT_ERR_NAME_TOO_LONG: str = "Name '%s' is too long"; break; + case XT_ERR_TABLE_NOT_FOUND: str = "Table `%s` not found"; break; + case XT_ERR_SESSION_NOT_FOUND: str = "Session %s not found"; break; + case XT_ERR_BAD_ADDRESS: str = "Incorrect address '%s'"; break; + case XT_ERR_UNKNOWN_SERVICE: str = "Unknown service '%s'"; break; + case XT_ERR_UNKNOWN_HOST: str = "Host '%s' not found"; break; + case XT_ERR_TOKEN_EXPECTED: str = "%s expected in place of %s"; break; + case XT_ERR_PROPERTY_REQUIRED: str = "Property '%s' required"; break; + case XT_ERR_DEADLOCK: str = "Deadlock, transaction aborted"; break; + case XT_ERR_CANNOT_CHANGE_DB: str = "Cannot change database while transaction is in progress"; break; + case XT_ERR_ILLEGAL_CHAR: str = "Illegal character: '%s'"; break; + case XT_ERR_UNTERMINATED_STRING:str = "Unterminated string: %s"; break; + case XT_ERR_SYNTAX: str = "Syntax error near %s"; break; + case XT_ERR_ILLEGAL_INSTRUCTION:str = "Illegal instruction"; break; + case XT_ERR_OUT_OF_BOUNDS: str = "Memory reference out of bounds"; break; + case XT_ERR_STACK_UNDERFLOW: str = "Stack underflow"; break; + case XT_ERR_TYPE_MISMATCH: str = "Type mismatch"; break; + case XT_ERR_ILLEGAL_TYPE: str = "Illegal type for operator"; break; + case XT_ERR_ID_TOO_LONG: str = "Identifier too long: %s"; break; + case XT_ERR_TYPE_OVERFLOW: str = "Type overflow: %s"; break; + case XT_ERR_TABLE_IN_USE: str = "Table `%s` in use"; break; + case XT_ERR_NO_DATABASE_IN_USE: str = "No database in use"; break; + case XT_ERR_CANNOT_RESOLVE_TYPE:str = "Cannot resolve type with ID: %s"; break; + case XT_ERR_BAD_INDEX_DESC: str = "Unsupported index description: %s"; break; + case XT_ERR_WRONG_NO_OF_VALUES: str = "Incorrect number of values"; break; + case XT_ERR_CANNOT_OUTPUT_VALUE:str = "Cannot output given type"; break; + case XT_ERR_COLUMN_NOT_FOUND: str = "Column `%s.%s` not found"; break; + case XT_ERR_NOT_IMPLEMENTED: str = "Not implemented"; break; + case XT_ERR_UNEXPECTED_EOS: str = "Connection unexpectedly lost"; break; + case XT_ERR_BAD_TOKEN: str = "Incorrect binary token"; break; + case XT_ERR_RES_STACK_OVERFLOW: str = "Internal error: resource stack overflow"; break; + case XT_ERR_BAD_INDEX_TYPE: str = "Unsupported index type: %s"; break; + case XT_ERR_INDEX_EXISTS: str = "Index '%s' already exists"; break; + case XT_ERR_INDEX_STRUC_EXISTS: str = "Index '%s' has an identical structure"; break; + case XT_ERR_INDEX_NOT_FOUND: str = "Index '%s' not found"; break; + case XT_ERR_INDEX_CORRUPT: str = "Cannot read index '%s'"; break; + case XT_ERR_TYPE_NOT_SUPPORTED: str = "Data type %s not supported"; break; + case XT_ERR_BAD_TABLE_VERSION: str = "Table `%s` version not supported, upgrade required"; break; + case XT_ERR_BAD_RECORD_FORMAT: str = "Record format unknown, either corrupted or upgrade required"; break; + case XT_ERR_BAD_EXT_RECORD: str = "Extended record part does not match reference"; break; + case XT_ERR_RECORD_CHANGED: str = "Record already updated, transaction aborted"; break; + case XT_ERR_XLOG_WAS_CORRUPTED: str = "Corrupted transaction log has been truncated"; break; + case XT_ERR_DUPLICATE_KEY: str = "Duplicate unique key"; break; + case XT_ERR_NO_DICTIONARY: str = "Table `%s` has not yet been opened by MySQL"; break; + case XT_ERR_TOO_MANY_TABLES: str = "Limit of %s tables per database exceeded"; break; + case XT_ERR_KEY_TOO_LARGE: str = "Index '%s' exceeds the key size limit of %s"; break; + case XT_ERR_MULTIPLE_DATABASES: str = "Multiple database in a single transaction is not permitted"; break; + case XT_ERR_NO_TRANSACTION: str = "Internal error: no transaction running"; break; + case XT_ERR_A_EXPECTED_NOT_B: str = "%s expected in place of %s"; break; + case XT_ERR_NO_MATCHING_INDEX: str = "Matching index required for '%s'"; break; + case XT_ERR_TABLE_LOCKED: str = "Table `%s` locked"; break; + case XT_ERR_NO_REFERENCED_ROW: str = "Constraint: `%s`"; break; // "Foreign key '%s', referenced row does not exist" + case XT_ERR_ROW_IS_REFERENCED: str = "Constraint: `%s`"; break; // "Foreign key '%s', has a referencing row" + case XT_ERR_BAD_DICTIONARY: str = "Internal dictionary does not match MySQL dictionary"; break; + case XT_ERR_LOADING_MYSQL_DIC: str = "Error loading %s.frm file, MySQL error: %s"; break; + case XT_ERR_COLUMN_IS_NOT_NULL: str = "Column `%s` is NOT NULL"; break; + case XT_ERR_INCORRECT_NO_OF_COLS: str = "Incorrect number of columns near %s"; break; + case XT_ERR_FK_ON_TEMP_TABLE: str = "Cannot create foreign key on temporary table"; break; + case XT_ERR_REF_TABLE_NOT_FOUND: str = "Referenced table `%s` not found"; break; + case XT_ERR_REF_TYPE_WRONG: str = "Incorrect data type on referenced column `%s`"; break; + case XT_ERR_DUPLICATE_FKEY: str = "Duplicate unique foreign key, contraint: %s"; break; + case XT_ERR_INDEX_FILE_TO_LARGE: str = "Index file has grown too large: %s"; break; + case XT_ERR_UPGRADE_TABLE: str = "Table `%s` must be upgraded from PBXT version %s"; break; + case XT_ERR_INDEX_NEW_VERSION: str = "Table `%s` index created by a newer version, upgrade required"; break; + case XT_ERR_LOCK_TIMEOUT: str = "Lock timeout on table `%s`"; break; + case XT_ERR_CONVERSION: str = "Error converting value for column `%s.%s`"; break; + case XT_ERR_NO_ROWS: str = "No matching row found in table `%s`"; break; + case XT_ERR_DATA_LOG_NOT_FOUND: str = "Data log not found: '%s'"; break; + case XT_ERR_LOG_MAX_EXCEEDED: str = "Maximum log count, %s, exceeded"; break; + case XT_ERR_MAX_ROW_COUNT: str = "Maximum row count reached"; break; + case XT_ERR_FILE_TOO_LONG: str = "File cannot be mapped, too large: '%s'"; break; + case XT_ERR_BAD_IND_BLOCK_SIZE: str = "Table `%s`, incorrect index block size: %s"; break; + case XT_ERR_INDEX_CORRUPTED: str = "Table `%s` index is corrupted, REPAIR TABLE required"; break; + case XT_ERR_NO_INDEX_CACHE: str = "Not enough index cache memory to handle concurrent updates"; break; + case XT_ERR_INDEX_LOG_CORRUPT: str = "Index log corrupt: '%s'"; break; + case XT_ERR_TOO_MANY_THREADS: str = "Too many threads: %s, increase pbxt_max_threads"; break; + case XT_ERR_TOO_MANY_WAITERS: str = "Too many waiting threads: %s"; break; + case XT_ERR_INDEX_OLD_VERSION: str = "Table `%s` index created by an older version, REPAIR TABLE required"; break; + case XT_ERR_PBXT_TABLE_EXISTS: str = "System table cannot be dropped because PBXT table still exists"; break; + case XT_ERR_SERVER_RUNNING: str = "A server is possibly already running"; break; + case XT_ERR_INDEX_MISSING: str = "Index file of table '%s' is missing"; break; + case XT_ERR_RECORD_DELETED: str = "Record was deleted"; break; + case XT_ERR_NEW_TYPE_OF_XLOG: str = "Transaction log %s, is using a newer format, upgrade required"; break; + case XT_ERR_NO_BEFORE_IMAGE: str = "Internal error: no before image"; break; + case XT_ERR_FK_REF_TEMP_TABLE: str = "Foreign key may not reference temporary table"; break; + case XT_ERR_MYSQL_SHUTDOWN: str = "Cannot open table, MySQL has shutdown"; break; + case XT_ERR_MYSQL_NO_THREAD: str = "Cannot create thread, MySQL has shutdown"; break; + case XT_ERR_BUFFER_TOO_SMALL: str = "System backup buffer too small"; break; + case XT_ERR_BAD_BACKUP_FORMAT: str = "Unknown or corrupt backup format, restore aborted"; break; + case XT_ERR_PBXT_NOT_INSTALLED: str = "PBXT plugin is not installed"; break; + default: str = "Unknown XT error"; break; + } + return str; +} + +xtPublic void xt_throw_i2xterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, c_char *item, c_char *item2) +{ + xt_throwf(self, func, file, line, xt_err, 0, thr_get_err_string(xt_err), item, item2); +} + +xtPublic void xt_throw_ixterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, c_char *item) +{ + xt_throw_i2xterr(self, func, file, line, xt_err, item, NULL); +} + +xtPublic void xt_throw_tabcolerr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item, c_char *item2) +{ + char buffer[XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + 3]; + + xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path); + xt_strcat(sizeof(buffer), buffer, "."); + xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); + + xt_throw_i2xterr(self, func, file, line, xt_err, buffer, item2); +} + +xtPublic void xt_throw_taberr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item) +{ + char buffer[XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + 3]; + + xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path); + xt_strcat(sizeof(buffer), buffer, "."); + xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); + + xt_throw_ixterr(self, func, file, line, xt_err, buffer); +} + +xtPublic void xt_throw_ulxterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, u_long value) +{ + char buffer[100]; + + sprintf(buffer, "%lu", value); + xt_throw_ixterr(self, func, file, line, xt_err, buffer); +} + +xtPublic void xt_throw_sulxterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, c_char *item, u_long value) +{ + char buffer[100]; + + sprintf(buffer, "%lu", value); + xt_throw_i2xterr(self, func, file, line, xt_err, item, buffer); +} + +xtPublic void xt_throw_xterr(XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err) +{ + xt_throw_ixterr(self, func, file, line, xt_err, NULL); +} + +xtPublic void xt_throw_errno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err) +{ + char err_msg[XT_SYS_ERR_SIZE]; + + xt_throw_error(self, func, file, line, XT_SYSTEM_ERROR, err, thr_get_sys_error(err, err_msg)); +} + +xtPublic void xt_throw_ferrno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err, c_char *path) +{ + char err_msg[XT_SYS_ERR_SIZE]; + + xt_throwf(self, func, file, line, XT_SYSTEM_ERROR, err, "%s: '%s'", thr_get_sys_error(err, err_msg), path); +} + +xtPublic void xt_throw_assertion(XTThreadPtr self, c_char *func, c_char *file, u_int line, c_char *str) +{ + xt_throw_error(self, func, file, line, XT_ASSERTION_FAILURE, 0, str); +} + +static void xt_log_assertion(XTThreadPtr self, c_char *func, c_char *file, u_int line, c_char *str) +{ + xt_log_error(self, func, file, line, XT_LOG_DEFAULT, XT_ASSERTION_FAILURE, 0, str); +} + +xtPublic void xt_throw_signal(XTThreadPtr self, c_char *func, c_char *file, u_int line, int sig) +{ +#ifdef XT_WIN + char buffer[100]; + + sprintf(buffer, "Signal #%d", sig); + xt_throw_error(self, func, file, line, XT_SIGNAL_CAUGHT, sig, buffer); +#else + xt_throw_error(self, func, file, line, XT_SIGNAL_CAUGHT, sig, strsignal(sig)); +#endif +} + +/* + * ----------------------------------------------------------------------- + * REGISTERING EXCEPTIONS + */ + +xtPublic void xt_registerf(c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...) +{ + va_list ap; + XTThreadPtr thread = xt_get_self(); + + va_start(ap, fmt); + thr_save_error_va(thread ? &thread->t_exception : NULL, thread, FALSE, func, file, line, xt_err, sys_err, fmt, ap); + va_end(ap); +} + +xtPublic void xt_register_i2xterr(c_char *func, c_char *file, u_int line, int xt_err, c_char *item, c_char *item2) +{ + xt_registerf(func, file, line, xt_err, 0, thr_get_err_string(xt_err), item, item2); +} + +xtPublic void xt_register_ixterr(c_char *func, c_char *file, u_int line, int xt_err, c_char *item) +{ + xt_register_i2xterr(func, file, line, xt_err, item, NULL); +} + +xtPublic void xt_register_tabcolerr(c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item, c_char *item2) +{ + char buffer[XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + 3]; + + xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path); + xt_strcat(sizeof(buffer), buffer, "."); + xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); + + xt_register_i2xterr(func, file, line, xt_err, buffer, item2); +} + +xtPublic void xt_register_taberr(c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item) +{ + char buffer[XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + XT_IDENTIFIER_NAME_SIZE + 3]; + + xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path); + xt_strcat(sizeof(buffer), buffer, "."); + xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); + + xt_register_ixterr(func, file, line, xt_err, buffer); +} + +xtPublic void xt_register_ulxterr(c_char *func, c_char *file, u_int line, int xt_err, u_long value) +{ + char buffer[100]; + + sprintf(buffer, "%lu", value); + xt_register_ixterr(func, file, line, xt_err, buffer); +} + +xtPublic xtBool xt_register_ferrno(c_char *func, c_char *file, u_int line, int err, c_char *path) +{ + char err_msg[XT_SYS_ERR_SIZE]; + + xt_registerf(func, file, line, XT_SYSTEM_ERROR, err, "%s: '%s'", thr_get_sys_error(err, err_msg), path); + return FAILED; +} + +xtPublic void xt_register_error(c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg) +{ + xt_registerf(func, file, line, xt_err, sys_err, "%s", msg); +} + +xtPublic xtBool xt_register_errno(c_char *func, c_char *file, u_int line, int err) +{ + char err_msg[XT_SYS_ERR_SIZE]; + + xt_register_error(func, file, line, XT_SYSTEM_ERROR, err, thr_get_sys_error(err, err_msg)); + return FAILED; +} + +xtPublic void xt_register_xterr(c_char *func, c_char *file, u_int line, int xt_err) +{ + xt_register_error(func, file, line, xt_err, 0, thr_get_err_string(xt_err)); +} + +/* + * ----------------------------------------------------------------------- + * CREATING EXCEPTIONS + */ + +xtPublic void xt_exceptionf(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + thr_save_error_va(e, self, FALSE, func, file, line, xt_err, sys_err, fmt, ap); + va_end(ap); +} + +xtPublic void xt_exception_error(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg) +{ + xt_exceptionf(e, self, func, file, line, xt_err, sys_err, "%s", msg); +} + +xtPublic xtBool xt_exception_errno(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int err) +{ + char err_msg[XT_SYS_ERR_SIZE]; + + xt_exception_error(e, self, func, file, line, XT_SYSTEM_ERROR, err, thr_get_sys_error(err, err_msg)); + return FAILED; +} + +xtPublic void xt_exception_xterr(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err) +{ + xt_exception_error(e, self, func, file, line, xt_err, 0, thr_get_err_string(xt_err)); +} + +/* + * ----------------------------------------------------------------------- + * LOG ERRORS + */ + +xtPublic void xt_log_errno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err) +{ + XTExceptionRec e; + + xt_exception_errno(&e, self, func, file, line, err); + xt_log_exception(self, &e, XT_LOG_DEFAULT); +} + +/* + * ----------------------------------------------------------------------- + * Assertions and failures (one breakpoints for all failures) + */ +//#define CRASH_ON_ASSERT + +xtPublic xtBool xt_assert(XTThreadPtr self, c_char *expr, c_char *func, c_char *file, u_int line) +{ + (void) self; +#ifdef DEBUG + //xt_set_fflush(TRUE); + //xt_dump_trace(); + break_in_assertion(expr, func, file, line); +#ifdef CRASH_ON_ASSERT + abort(); +#endif +#ifdef XT_WIN + FatalAppExit(0, "Assertion Failed!"); +#endif +#else + xt_throw_assertion(self, func, file, line, expr); +#endif + return FALSE; +} + +xtPublic xtBool xt_assume(XTThreadPtr self, c_char *expr, c_char *func, c_char *file, u_int line) +{ + xt_log_assertion(self, func, file, line, expr); + return FALSE; +} + +/* + * ----------------------------------------------------------------------- + * Create and destroy threads + */ + +typedef struct ThreadData { + xtBool td_started; + XTThreadPtr td_thr; + void *(*td_start_routine)(XTThreadPtr self); +} ThreadDataRec, *ThreadDataPtr; + +#ifdef XT_WIN +pthread_key(void *, thr_key); +#else +static pthread_key_t thr_key; +#endif + +#ifdef HANDLE_SIGNALS +static void thr_ignore_signal(int sig) +{ +#pragma unused(sig) +} + +static void thr_throw_signal(int sig) +{ + XTThreadPtr self; + + self = xt_get_self(); + + if (self->t_main) { + /* The main thread will pass on a signal to all threads: */ + xt_signal_all_threads(self, sig); + if (sig != SIGTERM) { + if (self->t_disable_interrupts) { + self->t_delayed_signal = sig; + self->t_disable_interrupts = FALSE; /* Prevent infinite loop */ + } + else { + self->t_delayed_signal = 0; + xt_throw_signal(self, "thr_throw_signal", NULL, 0, sig); + } + } + } + else { + if (self->t_disable_interrupts) { + self->t_delayed_signal = sig; + self->t_disable_interrupts = FALSE; /* Prevent infinite loop */ + } + else { + self->t_delayed_signal = 0; + xt_throw_signal(self, "thr_throw_signal", NULL, 0, sig); + } + } +} + +static xtBool thr_setup_signals(void) +{ + struct sigaction action; + + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + action.sa_handler = thr_ignore_signal; + + if (sigaction(SIGPIPE, &action, NULL) == -1) + goto error_occurred; + if (sigaction(SIGHUP, &action, NULL) == -1) + goto error_occurred; + + action.sa_handler = thr_throw_signal; + + if (sigaction(SIGQUIT, &action, NULL) == -1) + goto error_occurred; + if (sigaction(SIGTERM, &action, NULL) == -1) + goto error_occurred; +#ifndef DEBUG + if (sigaction(SIGILL, &action, NULL) == -1) + goto error_occurred; + if (sigaction(SIGBUS, &action, NULL) == -1) + goto error_occurred; + if (sigaction(SIGSEGV, &action, NULL) == -1) + goto error_occurred; +#endif + return TRUE; + + error_occurred: + xt_log_errno(XT_NS_CONTEXT, errno); + return FALSE; +} +#endif + +typedef void *(*ThreadMainFunc)(XTThreadPtr self); + +extern "C" void *xt_thread_main(void *data) +{ + ThreadDataPtr td = (ThreadDataPtr) data; + XTThreadPtr self = td->td_thr; + ThreadMainFunc start_routine; + void *return_data; + + enter_(); + self->t_pthread = pthread_self(); + start_routine = td->td_start_routine; + return_data = NULL; + +#ifdef HANDLE_SIGNALS + if (!thr_setup_signals()) + return NULL; +#endif + + try_(a) { + if (!xt_set_key((pthread_key_t)thr_key, self, &self->t_exception)) + throw_(); + td->td_started = TRUE; + return_data = (*start_routine)(self); + } + catch_(a) { + xt_log_and_clear_exception(self); + } + cont_(a); + + outer_(); + xt_free_thread(self); + + /* {MYSQL-THREAD-KILL} + * Clean up any remaining MySQL thread! + */ + myxt_delete_remaining_thread(); + return return_data; +} + +static void thr_free_data(XTThreadPtr self) +{ + if (self->t_free_data) { + (*self->t_free_data)(self, self->t_data); + self->t_data = NULL; + } +} + +xtPublic void xt_set_thread_data(XTThreadPtr self, void *data, XTThreadFreeFunc free_func) +{ + thr_free_data(self); + self->t_free_data = free_func; + self->t_data = data; +} + +static void thr_exit(XTThreadPtr self) +{ + /* Free the thread temporary data. */ + thr_free_resources(self, (XTResourcePtr) self->x.t_res_stack); + xt_db_exit_thread(self); + thr_free_data(self); /* Free custom user data. */ + + if (self->t_id > 0) { + ASSERT(self->t_id < xt_thr_current_max_threads); + xt_lock_mutex(self, &thr_array_lock); + pushr_(xt_unlock_mutex, &thr_array_lock); + thr_accumulate_statistics(self); + xt_thr_array[self->t_id] = NULL; + xt_thr_current_thread_count--; + if (self->t_id+1 == xt_thr_current_max_threads) { + /* We can reduce the current maximum, + * this makes operations that scan the array faster! + */ + u_int i; + + i = self->t_id; + for(;;) { + if (xt_thr_array[i]) + break; + if (!i) + break; + i--; + } + xt_thr_current_max_threads = i+1; + } + freer_(); // xt_unlock_mutex(&thr_array_lock) + } + + xt_free_cond(&self->t_cond); + xt_free_mutex(&self->t_lock); + + self->st_thread_list_count = 0; + self->st_thread_list_size = 0; + if (self->st_thread_list) { + xt_free_ns(self->st_thread_list); + self->st_thread_list = NULL; + } +} + +static void thr_init(XTThreadPtr self, XTThreadPtr new_thread) +{ + new_thread->t_res_top = (XTResourcePtr) new_thread->x.t_res_stack; + + new_thread->st_thread_list_count = 0; + new_thread->st_thread_list_size = 0; + new_thread->st_thread_list = NULL; + try_(a) { + xt_init_cond(self, &new_thread->t_cond); + xt_init_mutex_with_autoname(self, &new_thread->t_lock); + + xt_lock_mutex(self, &thr_array_lock); + pushr_(xt_unlock_mutex, &thr_array_lock); + + ASSERT(xt_thr_current_thread_count <= xt_thr_current_max_threads); + ASSERT(xt_thr_current_max_threads <= xt_thr_maximum_threads); + if (xt_thr_current_thread_count == xt_thr_maximum_threads) + xt_throw_ulxterr(XT_CONTEXT, XT_ERR_TOO_MANY_THREADS, (u_long) xt_thr_maximum_threads+1); + if (xt_thr_current_thread_count == xt_thr_current_max_threads) { + new_thread->t_id = xt_thr_current_thread_count; + xt_thr_array[new_thread->t_id] = new_thread; + xt_thr_current_max_threads++; + } + else { + /* There must be a free slot: */ + for (u_int i=0; i<xt_thr_current_max_threads; i++) { + if (!xt_thr_array[i]) { + new_thread->t_id = i; + xt_thr_array[i] = new_thread; + break; + } + } + } + xt_thr_current_thread_count++; + freer_(); // xt_unlock_mutex(&thr_array_lock) + + xt_db_init_thread(self, new_thread); + } + catch_(a) { + thr_exit(new_thread); + throw_(); + } + cont_(a); + +} + +/* + * The caller of this function automatically becomes the main thread. + */ +xtPublic XTThreadPtr xt_init_threading(u_int max_threads) +{ + volatile XTThreadPtr self = NULL; + XTExceptionRec e; + int err; + + /* Align the number of threads: */ + xt_thr_maximum_threads = xt_align_size(max_threads, XT_XS_LOCK_ALIGN); + +#ifdef XT_TRACK_CONNECTIONS + if (xt_thr_maximum_threads > XT_TRACK_MAX_CONNS) { + xt_log_error(XT_NS_CONTEXT, XT_LOG_FATAL, XT_ERR_TOO_MANY_THREADS, 0, + "XT_TRACK_CONNECTIONS is enabled and xt_thr_maximum_threads > XT_TRACK_MAX_CONNS"); + goto failed; + } +#endif + +#ifdef HANDLE_SIGNALS + if (!thr_setup_signals()) + return NULL; +#endif + + xt_p_init_threading(); + + err = pthread_key_create(&thr_key, NULL); + if (err) { + xt_log_errno(XT_NS_CONTEXT, err); + return NULL; + } + + if ((err = xt_p_mutex_init_with_autoname(&thr_array_lock, NULL))) { + xt_log_errno(XT_NS_CONTEXT, err); + goto failed; + } + + if (!(xt_thr_array = (XTThreadPtr *) malloc(xt_thr_maximum_threads * sizeof(XTThreadPtr)))) { + xt_log_errno(XT_NS_CONTEXT, XT_ENOMEM); + goto failed; + } + + xt_thr_array[0] = (XTThreadPtr) 1; // Dummy, not used + xt_thr_current_thread_count = 1; + xt_thr_current_max_threads = 1; + + /* Create the main thread: */ + self = xt_create_thread("MainThread", TRUE, FALSE, &e); + if (!self) { + xt_log_exception(NULL, &e, XT_LOG_DEFAULT); + goto failed; + } + + try_(a) { + XTThreadPtr thread = self; + thr_list = xt_new_linkedlist(thread, NULL, NULL, TRUE); + } + catch_(a) { + XTThreadPtr thread = self; + xt_log_and_clear_exception(thread); + xt_exit_threading(thread); + } + cont_(a); + + return self; + + failed: + xt_exit_threading(NULL); + return NULL; +} + +xtPublic void xt_exit_threading(XTThreadPtr self) +{ + if (thr_list) { + xt_free_linkedlist(self, thr_list); + thr_list = NULL; + } + + /* This should be the main thread! */ + if (self) { + ASSERT(self->t_main); + xt_free_thread(self); + } + + if (xt_thr_array) { + free(xt_thr_array); + xt_thr_array = NULL; + xt_free_mutex(&thr_array_lock); + } + + xt_thr_current_thread_count = 0; + xt_thr_current_max_threads = 0; + + /* I no longer delete 'thr_key' because + * functions that call xt_get_self() after this + * point will get junk back if we delete + * thr_key. In particular the XT_THREAD_LOCK_INFO + * code fails + if (thr_key) { + pthread_key_delete(thr_key); + thr_key = (pthread_key_t) 0; + } + */ +} + +xtPublic void xt_wait_for_all_threads(XTThreadPtr self) +{ + if (thr_list) + xt_ll_wait_till_empty(self, thr_list); +} + +/* + * Call this function in a busy wait loop! + * Use if for wait loops that are not + * time critical. + */ +xtPublic void xt_busy_wait(void) +{ +#ifdef XT_WIN + Sleep(1); +#else + usleep(10); +#endif +} + +xtPublic void xt_critical_wait(void) +{ + /* NOTE: On Mac xt_busy_wait() works better than xt_yield() + */ +#if defined(XT_MAC) || defined(XT_WIN) + xt_busy_wait(); +#else + xt_yield(); +#endif +} + + +/* + * Use this for loops that time critical. + * Time critical means we need to get going + * as soon as possible! + */ +xtPublic void xt_yield(void) +{ +#ifdef XT_WIN + Sleep(0); +#elif defined(XT_MAC) || defined(XT_SOLARIS) + usleep(0); +#elif defined(XT_NETBSD) + sched_yield(); +#else + pthread_yield(); +#endif +} + +xtPublic void xt_sleep_milli_second(u_int t) +{ +#ifdef XT_WIN + Sleep(t); +#else + usleep(t * 1000); +#endif +} + +xtPublic void xt_signal_all_threads(XTThreadPtr self, int sig) +{ + XTLinkedItemPtr li; + XTThreadPtr sig_thr; + + xt_ll_lock(self, thr_list); + try_(a) { + li = thr_list->ll_items; + while (li) { + sig_thr = (XTThreadPtr) li; + if (sig_thr != self) + pthread_kill(sig_thr->t_pthread, sig); + li = li->li_next; + } + } + catch_(a) { + xt_ll_unlock(self, thr_list); + throw_(); + } + cont_(a); + xt_ll_unlock(self, thr_list); +} + +/* + * Apply the given function to all threads except self! + */ +xtPublic void xt_do_to_all_threads(XTThreadPtr self, void (*do_func_ptr)(XTThreadPtr self, XTThreadPtr to_thr, void *thunk), void *thunk) +{ + XTLinkedItemPtr li; + XTThreadPtr to_thr; + + xt_ll_lock(self, thr_list); + pushr_(xt_ll_unlock, thr_list); + + li = thr_list->ll_items; + while (li) { + to_thr = (XTThreadPtr) li; + if (to_thr != self) + (*do_func_ptr)(self, to_thr, thunk); + li = li->li_next; + } + + freer_(); // xt_ll_unlock(thr_list) +} + +xtPublic XTThreadPtr xt_get_self(void) +{ + XTThreadPtr self; + + /* First check if the handler has the data: */ + if ((self = myxt_get_self())) + return self; + /* Then it must be a background process, and the + * thread info is stored in the local key: */ + return (XTThreadPtr) xt_get_key((pthread_key_t)thr_key); +} + +xtPublic void xt_set_self(XTThreadPtr self) +{ + xt_set_key((pthread_key_t)thr_key, self, NULL); +} + +xtPublic void xt_clear_exception(XTThreadPtr thread) +{ + thread->t_exception.e_xt_err = 0; + thread->t_exception.e_sys_err = 0; + *thread->t_exception.e_err_msg = 0; + *thread->t_exception.e_func_name = 0; + *thread->t_exception.e_source_file = 0; + thread->t_exception.e_source_line = 0; + *thread->t_exception.e_catch_trace = 0; +} + +/* + * Create a thread without requiring thread to do it (as in xt_create_daemon()). + * + * This function returns NULL on error. + */ +xtPublic XTThreadPtr xt_create_thread(c_char *name, xtBool main_thread, xtBool user_thread, XTExceptionPtr e) +{ + volatile XTThreadPtr self; + + self = (XTThreadPtr) xt_calloc_ns(sizeof(XTThreadRec)); + if (!self) { + xt_exception_errno(e, XT_CONTEXT, ENOMEM); + return NULL; + } + + if (!xt_set_key((pthread_key_t)thr_key, self, e)) { + xt_free_ns(self); + return NULL; + } + + xt_strcpy(XT_THR_NAME_SIZE, self->t_name, name); + self->t_main = main_thread; + self->t_daemon = FALSE; + + try_(a) { + thr_init(self, self); + } + catch_(a) { + *e = self->t_exception; + xt_set_key((pthread_key_t)thr_key, NULL, NULL); + xt_free_ns(self); + self = NULL; + } + cont_(a); + + if (self && user_thread) { + /* Add non-temporary threads to the thread list. */ + try_(b) { + xt_ll_add(self, thr_list, &self->t_links, TRUE); + } + catch_(b) { + *e = self->t_exception; + xt_free_thread(self); + self = NULL; + } + cont_(b); + } + + return self; +} + +/* + * Create a daemon thread. + */ +xtPublic XTThreadPtr xt_create_daemon(XTThreadPtr self, c_char *name) +{ + XTThreadPtr new_thread; + + /* NOTE: thr_key will be set when this thread start running. */ + + new_thread = (XTThreadPtr) xt_calloc(self, sizeof(XTThreadRec)); + xt_strcpy(XT_THR_NAME_SIZE, new_thread->t_name, name); + new_thread->t_main = FALSE; + new_thread->t_daemon = TRUE; + + try_(a) { + thr_init(self, new_thread); + } + catch_(a) { + xt_free(self, new_thread); + throw_(); + } + cont_(a); + return new_thread; +} + +void xt_free_thread(XTThreadPtr self) +{ + thr_exit(self); + if (!self->t_daemon && thr_list) + xt_ll_remove(self, thr_list, &self->t_links, TRUE); + /* Note, if I move this before thr_exit() then self = xt_get_self(); will fail in + * xt_close_file_ns() which is called by xt_unuse_database()! + */ + + /* + * Do not clear the pthread's key value unless it is the same as the thread just released. + * This can happen during shutdown when the engine is deregistered with the PBMS engine. + * + * What happens is that during deregistration the PBMS engine calls close to close all + * PBXT resources on all MySQL THDs created by PBMS for it's own pthreads. So the 'self' + * being freed is not the same 'self' associated with the PBXT 'thr_key'. + */ + if (thr_key && (self == ((XTThreadPtr) xt_get_key((pthread_key_t)thr_key)))) { + xt_set_key((pthread_key_t)thr_key, NULL, NULL); + } + xt_free_ns(self); +} + +xtPublic pthread_t xt_run_thread(XTThreadPtr self, XTThreadPtr child, void *(*start_routine)(XTThreadPtr)) +{ + ThreadDataRec data; + int err; + pthread_t child_thread; + + enter_(); + + // 'data' can be on the stack because we are waiting for the thread to start + // before exiting the function. + data.td_started = FALSE; + data.td_thr = child; + data.td_start_routine = start_routine; +#ifdef XT_WIN + { + pthread_attr_t attr = { 0, 0, 0 }; + + attr.priority = THREAD_PRIORITY_NORMAL; + err = pthread_create(&child_thread, &attr, xt_thread_main, &data); + } +#else + err = pthread_create(&child_thread, NULL, xt_thread_main, &data); +#endif + if (err) { + xt_free_thread(child); + xt_throw_errno(XT_CONTEXT, err); + } + while (!data.td_started) { + /* Check that the self is still alive: */ + if (pthread_kill(child_thread, 0)) + break; + xt_busy_wait(); + } + return_(child_thread); +} + +xtPublic void xt_exit_thread(XTThreadPtr self, void *result) +{ + xt_free_thread(self); + pthread_exit(result); +} + +xtPublic void *xt_wait_for_thread(xtThreadID tid, xtBool ignore_error) +{ + int err; + void *value_ptr = NULL; + xtBool ok = FALSE; + XTThreadPtr thread; + pthread_t t1 = 0; + + xt_lock_mutex_ns(&thr_array_lock); + if (tid < xt_thr_maximum_threads) { + if ((thread = xt_thr_array[tid])) { + t1 = thread->t_pthread; + ok = TRUE; + } + } + xt_unlock_mutex_ns(&thr_array_lock); + if (ok) { + err = xt_p_join(t1, &value_ptr); + if (err && !ignore_error) + xt_log_errno(XT_NS_CONTEXT, err); + } + return value_ptr; +} + +/* + * Kill the given thead, and wait for it to terminate. + * This function just returns if the self is already dead. + */ +xtPublic void xt_kill_thread(pthread_t t1) +{ + int err; + void *value_ptr = NULL; + + err = pthread_kill(t1, SIGTERM); + if (err) + return; + err = xt_p_join(t1, &value_ptr); + if (err) + xt_log_errno(XT_NS_CONTEXT, err); +} + +/* + * ----------------------------------------------------------------------- + * Read/write locking + */ + +#ifdef XT_THREAD_LOCK_INFO +xtPublic xtBool xt_init_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock, const char *name) +#else +xtPublic xtBool xt_init_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock) +#endif +{ + int err; + +#ifdef XT_THREAD_LOCK_INFO + err = xt_p_rwlock_init_with_name(rwlock, NULL, name); +#else + err = xt_p_rwlock_init(rwlock, NULL); +#endif + + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return FAILED; + } + return OK; +} + +xtPublic void xt_free_rwlock(xt_rwlock_type *rwlock) +{ + int err; + + for (;;) { + err = xt_p_rwlock_destroy(rwlock); + if (err != XT_EBUSY) + break; + xt_busy_wait(); + } + /* PMC - xt_xn_exit_db() is called even when xt_xn_init_db() is not fully completed! + * This generates a lot of log entries. But I have no desire to only call + * free for those articles that I have init'ed! + if (err) + xt_log_errno(XT_NS_CONTEXT, err); + */ +} + +xtPublic xt_rwlock_type *xt_slock_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock) +{ + int err; + + for (;;) { + err = xt_slock_rwlock_ns(rwlock); + if (err != XT_EAGAIN) + break; + xt_busy_wait(); + } + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return NULL; + } + return rwlock; +} + +xtPublic xt_rwlock_type *xt_xlock_rwlock(XTThreadPtr self, xt_rwlock_type *rwlock) +{ + int err; + + for (;;) { + err = xt_xlock_rwlock_ns(rwlock); + if (err != XT_EAGAIN) + break; + xt_busy_wait(); + } + + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return NULL; + } + return rwlock; +} + +xtPublic void xt_unlock_rwlock(XTThreadPtr XT_UNUSED(self), xt_rwlock_type *rwlock) +{ + int err; + + err = xt_unlock_rwlock_ns(rwlock); + if (err) + xt_log_errno(XT_NS_CONTEXT, err); +} + +/* + * ----------------------------------------------------------------------- + * Mutex locking + */ + +xtPublic xt_mutex_type *xt_new_mutex(XTThreadPtr self) +{ + xt_mutex_type *mx; + + if (!(mx = (xt_mutex_type *) xt_calloc(self, sizeof(xt_mutex_type)))) + return NULL; + pushr_(xt_free, mx); + if (!xt_init_mutex_with_autoname(self, mx)) { + freer_(); + return NULL; + } + popr_(); + return mx; +} + +xtPublic void xt_delete_mutex(XTThreadPtr self, xt_mutex_type *mx) +{ + if (mx) { + xt_free_mutex(mx); + xt_free(self, mx); + } +} + +#ifdef XT_THREAD_LOCK_INFO +xtPublic xtBool xt_init_mutex(XTThreadPtr self, xt_mutex_type *mx, const char *name) +#else +xtPublic xtBool xt_init_mutex(XTThreadPtr self, xt_mutex_type *mx) +#endif +{ + int err; + + err = xt_p_mutex_init_with_name(mx, NULL, name); + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return FALSE; + } + return TRUE; +} + +void xt_free_mutex(xt_mutex_type *mx) +{ + int err; + + for (;;) { + err = xt_p_mutex_destroy(mx); + if (err != XT_EBUSY) + break; + xt_busy_wait(); + } + /* PMC - xt_xn_exit_db() is called even when xt_xn_init_db() is not fully completed! + if (err) + xt_log_errno(XT_NS_CONTEXT, err); + */ +} + +xtPublic xtBool xt_lock_mutex(XTThreadPtr self, xt_mutex_type *mx) +{ + int err; + + for (;;) { + err = xt_lock_mutex_ns(mx); + if (err != XT_EAGAIN) + break; + xt_busy_wait(); + } + + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return FALSE; + } + return TRUE; +} + +xtPublic void xt_unlock_mutex(XTThreadPtr self, xt_mutex_type *mx) +{ + int err; + + err = xt_unlock_mutex_ns(mx); + if (err) + xt_throw_errno(XT_CONTEXT, err); +} + +xtPublic xtBool xt_set_key(pthread_key_t key, const void *value, XTExceptionPtr e) +{ +#ifdef XT_WIN + my_pthread_setspecific_ptr(thr_key, (void *) value); +#else + int err; + + err = pthread_setspecific(key, value); + if (err) { + if (e) + xt_exception_errno(e, XT_NS_CONTEXT, err); + return FALSE; + } +#endif + return TRUE; +} + +xtPublic void *xt_get_key(pthread_key_t key) +{ +#ifdef XT_WIN + return my_pthread_getspecific_ptr(void *, thr_key); +#else + return pthread_getspecific(key); +#endif +} + +xtPublic xt_cond_type *xt_new_cond(XTThreadPtr self) +{ + xt_cond_type *cond; + + if (!(cond = (xt_cond_type *) xt_calloc(self, sizeof(xt_cond_type)))) + return NULL; + pushr_(xt_free, cond); + if (!xt_init_cond(self, cond)) { + freer_(); + return NULL; + } + popr_(); + return cond; +} + +xtPublic void xt_delete_cond(XTThreadPtr self, xt_cond_type *cond) +{ + if (cond) { + xt_free_cond(cond); + xt_free(self, cond); + } +} + +xtPublic xtBool xt_init_cond(XTThreadPtr self, xt_cond_type *cond) +{ + int err; + + err = pthread_cond_init(cond, NULL); + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return FALSE; + } + return TRUE; +} + +xtPublic void xt_free_cond(xt_cond_type *cond) +{ + int err; + + for (;;) { + err = pthread_cond_destroy(cond); + if (err != XT_EBUSY) + break; + xt_busy_wait(); + } + /* PMC - xt_xn_exit_db() is called even when xt_xn_init_db() is not fully completed! + if (err) + xt_log_errno(XT_NS_CONTEXT, err); + */ +} + +xtPublic xtBool xt_throw_delayed_signal(XTThreadPtr self, c_char *func, c_char *file, u_int line) +{ + XTThreadPtr me = self ? self : xt_get_self(); + + if (me->t_delayed_signal) { + int sig = me->t_delayed_signal; + + me->t_delayed_signal = 0; + xt_throw_signal(self, func, file, line, sig); + return FAILED; + } + return OK; +} + +xtPublic xtBool xt_wait_cond(XTThreadPtr self, xt_cond_type *cond, xt_mutex_type *mutex) +{ + int err; + XTThreadPtr me = self ? self : xt_get_self(); + + /* PMC - In my tests, if I throw an exception from within the wait + * the condition and the mutex remain locked. + */ + me->t_disable_interrupts = TRUE; + err = xt_p_cond_wait(cond, mutex); + me->t_disable_interrupts = FALSE; + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return FALSE; + } + if (me->t_delayed_signal) { + xt_throw_delayed_signal(XT_CONTEXT); + return FALSE; + } + return TRUE; +} + +xtPublic xtBool xt_suspend(XTThreadPtr thread) +{ + xtBool ok; + + // You can only suspend yourself. + ASSERT_NS(pthread_equal(thread->t_pthread, pthread_self())); + + xt_lock_mutex_ns(&thread->t_lock); + ok = xt_wait_cond(NULL, &thread->t_cond, &thread->t_lock); + xt_unlock_mutex_ns(&thread->t_lock); + return ok; +} + +xtPublic xtBool xt_unsuspend(XTThreadPtr target) +{ + return xt_broadcast_cond_ns(&target->t_cond); +} + +xtPublic void xt_lock_thread(XTThreadPtr thread) +{ + xt_lock_mutex_ns(&thread->t_lock); +} + +xtPublic void xt_unlock_thread(XTThreadPtr thread) +{ + xt_unlock_mutex_ns(&thread->t_lock); +} + +xtPublic xtBool xt_wait_thread(XTThreadPtr thread) +{ + return xt_wait_cond(NULL, &thread->t_cond, &thread->t_lock); +} + +xtPublic void xt_signal_thread(XTThreadPtr target) +{ + xt_broadcast_cond_ns(&target->t_cond); +} + +xtPublic void xt_terminate_thread(XTThreadPtr XT_UNUSED(self), XTThreadPtr target) +{ + target->t_quit = TRUE; + target->t_delayed_signal = SIGTERM; +} + +xtPublic xtProcID xt_getpid() +{ +#ifdef XT_WIN + return GetCurrentProcessId(); +#else + return getpid(); +#endif +} + +xtPublic xtBool xt_process_exists(xtProcID pid) +{ + xtBool found; + +#ifdef XT_WIN + HANDLE h; + DWORD code; + + found = FALSE; + h = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, pid); + if (h) { + if (GetExitCodeProcess(h, &code)) { + if (code == STILL_ACTIVE) + found = TRUE; + } + CloseHandle(h); + } + else { + int err; + + err = HRESULT_CODE(GetLastError()); + if (err != ERROR_INVALID_PARAMETER) + found = TRUE; + } +#else + found = TRUE; + if (kill(pid, 0) == -1) { + if (errno == ESRCH) + found = FALSE; + } +#endif + return found; +} + +xtPublic xtBool xt_timed_wait_cond(XTThreadPtr self, xt_cond_type *cond, xt_mutex_type *mutex, u_long milli_sec) +{ + int err; + struct timespec abstime; + XTThreadPtr me = self ? self : xt_get_self(); + +#ifdef XT_WIN + union ft64 now; + + GetSystemTimeAsFileTime(&now.ft); + + /* System time is measured in 100ns units. + * This calculation will be reversed by the Windows implementation + * of pthread_cond_timedwait(), in order to extract the + * milli-second timeout! + */ + abstime.tv.i64 = now.i64 + (milli_sec * 10000); + + abstime.max_timeout_msec = milli_sec; +#else + struct timeval now; + u_llong micro_sec; + + /* Get the current time in microseconds: */ + gettimeofday(&now, NULL); + micro_sec = (u_llong) now.tv_sec * (u_llong) 1000000 + (u_llong) now.tv_usec; + + /* Add the timeout which is in milli seconds */ + micro_sec += (u_llong) milli_sec * (u_llong) 1000; + + /* Setup the end time, which is in nano-seconds. */ + abstime.tv_sec = (long) (micro_sec / 1000000); /* seconds */ + abstime.tv_nsec = (long) ((micro_sec % 1000000) * 1000); /* and nanoseconds */ +#endif + + me->t_disable_interrupts = TRUE; + err = xt_p_cond_timedwait(cond, mutex, &abstime); + me->t_disable_interrupts = FALSE; + if (err && err != ETIMEDOUT) { + xt_throw_errno(XT_CONTEXT, err); + return FALSE; + } + if (me->t_delayed_signal) { + xt_throw_delayed_signal(XT_CONTEXT); + return FALSE; + } + return TRUE; +} + +xtPublic xtBool xt_signal_cond(XTThreadPtr self, xt_cond_type *cond) +{ + int err; + + err = pthread_cond_signal(cond); + if (err) { + xt_throw_errno(XT_CONTEXT, err); + return FAILED; + } + return OK; +} + +xtPublic void xt_broadcast_cond(XTThreadPtr self, xt_cond_type *cond) +{ + int err; + + err = pthread_cond_broadcast(cond); + if (err) + xt_throw_errno(XT_CONTEXT, err); +} + +xtPublic xtBool xt_broadcast_cond_ns(xt_cond_type *cond) +{ + int err; + + err = pthread_cond_broadcast(cond); + if (err) { + xt_register_errno(XT_REG_CONTEXT, err); + return FAILED; + } + return OK; +} + +static int prof_setjmp_count = 0; + +xtPublic int prof_setjmp(void) +{ + prof_setjmp_count++; + return 0; +} + +xtPublic void xt_set_low_priority(XTThreadPtr self) +{ + int err = xt_p_set_low_priority(self->t_pthread); + if (err) { + self = NULL; /* Will cause logging, instead of throwing exception */ + xt_throw_errno(XT_CONTEXT, err); + } +} + +xtPublic void xt_set_normal_priority(XTThreadPtr self) +{ + int err = xt_p_set_normal_priority(self->t_pthread); + if (err) { + self = NULL; /* Will cause logging, instead of throwing exception */ + xt_throw_errno(XT_CONTEXT, err); + } +} + +xtPublic void xt_set_high_priority(XTThreadPtr self) +{ + int err = xt_p_set_high_priority(self->t_pthread); + if (err) { + self = NULL; /* Will cause logging, instead of throwing exception */ + xt_throw_errno(XT_CONTEXT, err); + } +} + +xtPublic void xt_set_priority(XTThreadPtr self, int priority) +{ + if (priority < XT_PRIORITY_NORMAL) + xt_set_low_priority(self); + else if (priority > XT_PRIORITY_NORMAL) + xt_set_high_priority(self); + else + xt_set_normal_priority(self); +} + +/* + * ----------------------------------------------------------------------- + * STATISTICS + */ + +xtPublic void xt_gather_statistics(XTStatisticsPtr stats) +{ + XTThreadPtr *thr; + xtWord8 s; + + xt_lock_mutex_ns(&thr_array_lock); + *stats = thr_statistics; + // Ignore index 0, it is not used! + thr = &xt_thr_array[1]; + for (u_int i=1; i<xt_thr_current_max_threads; i++) { + if (*thr) { + stats->st_commits += (*thr)->st_statistics.st_commits; + stats->st_rollbacks += (*thr)->st_statistics.st_rollbacks; + stats->st_stat_read += (*thr)->st_statistics.st_stat_read; + stats->st_stat_write += (*thr)->st_statistics.st_stat_write; + + XT_ADD_STATS(stats->st_rec, (*thr)->st_statistics.st_rec); + if ((s = (*thr)->st_statistics.st_rec.ts_flush_start)) + stats->st_rec.ts_flush_time += xt_trace_clock() - s; + stats->st_rec_cache_hit += (*thr)->st_statistics.st_rec_cache_hit; + stats->st_rec_cache_miss += (*thr)->st_statistics.st_rec_cache_miss; + stats->st_rec_cache_frees += (*thr)->st_statistics.st_rec_cache_frees; + + XT_ADD_STATS(stats->st_ind, (*thr)->st_statistics.st_ind); + if ((s = (*thr)->st_statistics.st_ind.ts_flush_start)) + stats->st_ind.ts_flush_time += xt_trace_clock() - s; + stats->st_ind_cache_hit += (*thr)->st_statistics.st_ind_cache_hit; + stats->st_ind_cache_miss += (*thr)->st_statistics.st_ind_cache_miss; + XT_ADD_STATS(stats->st_ilog, (*thr)->st_statistics.st_ilog); + + XT_ADD_STATS(stats->st_xlog, (*thr)->st_statistics.st_xlog); + if ((s = (*thr)->st_statistics.st_xlog.ts_flush_start)) + stats->st_xlog.ts_flush_time += xt_trace_clock() - s; + stats->st_xlog_cache_hit += (*thr)->st_statistics.st_xlog_cache_hit; + stats->st_xlog_cache_miss += (*thr)->st_statistics.st_xlog_cache_miss; + + XT_ADD_STATS(stats->st_data, (*thr)->st_statistics.st_data); + if ((s = (*thr)->st_statistics.st_data.ts_flush_start)) + stats->st_data.ts_flush_time += xt_trace_clock() - s; + + stats->st_scan_index += (*thr)->st_statistics.st_scan_index; + stats->st_scan_table += (*thr)->st_statistics.st_scan_table; + stats->st_row_select += (*thr)->st_statistics.st_row_select; + stats->st_row_insert += (*thr)->st_statistics.st_row_insert; + stats->st_row_update += (*thr)->st_statistics.st_row_update; + stats->st_row_delete += (*thr)->st_statistics.st_row_delete; + + stats->st_wait_for_xact += (*thr)->st_statistics.st_wait_for_xact; + stats->st_retry_index_scan += (*thr)->st_statistics.st_retry_index_scan; + stats->st_reread_record_list += (*thr)->st_statistics.st_reread_record_list; + } + thr++; + } + xt_unlock_mutex_ns(&thr_array_lock); +} + +static void thr_accumulate_statistics(XTThreadPtr self) +{ + thr_statistics.st_commits += self->st_statistics.st_commits; + thr_statistics.st_rollbacks += self->st_statistics.st_rollbacks; + thr_statistics.st_stat_read += self->st_statistics.st_stat_read; + thr_statistics.st_stat_write += self->st_statistics.st_stat_write; + + XT_ADD_STATS(thr_statistics.st_rec, self->st_statistics.st_rec); + thr_statistics.st_rec_cache_hit += self->st_statistics.st_rec_cache_hit; + thr_statistics.st_rec_cache_miss += self->st_statistics.st_rec_cache_miss; + thr_statistics.st_rec_cache_frees += self->st_statistics.st_rec_cache_frees; + + XT_ADD_STATS(thr_statistics.st_ind, self->st_statistics.st_ind); + thr_statistics.st_ind_cache_hit += self->st_statistics.st_ind_cache_hit; + thr_statistics.st_ind_cache_miss += self->st_statistics.st_ind_cache_miss; + XT_ADD_STATS(thr_statistics.st_ilog, self->st_statistics.st_ilog); + + XT_ADD_STATS(thr_statistics.st_xlog, self->st_statistics.st_xlog); + thr_statistics.st_xlog_cache_hit += self->st_statistics.st_xlog_cache_hit; + thr_statistics.st_xlog_cache_miss += self->st_statistics.st_xlog_cache_miss; + + XT_ADD_STATS(thr_statistics.st_data, self->st_statistics.st_data); + + thr_statistics.st_scan_index += self->st_statistics.st_scan_index; + thr_statistics.st_scan_table += self->st_statistics.st_scan_table; + thr_statistics.st_row_select += self->st_statistics.st_row_select; + thr_statistics.st_row_insert += self->st_statistics.st_row_insert; + thr_statistics.st_row_update += self->st_statistics.st_row_update; + thr_statistics.st_row_delete += self->st_statistics.st_row_delete; + + thr_statistics.st_wait_for_xact += self->st_statistics.st_wait_for_xact; + thr_statistics.st_retry_index_scan += self->st_statistics.st_retry_index_scan; + thr_statistics.st_reread_record_list += self->st_statistics.st_reread_record_list; +} + +xtPublic u_llong xt_get_statistic(XTStatisticsPtr stats, XTDatabaseHPtr db, u_int rec_id) +{ + u_llong stat_value; + + switch (rec_id) { + case XT_STAT_TIME_CURRENT: + stat_value = (u_llong) time(NULL); + break; + case XT_STAT_TIME_PASSED: + stat_value = (u_llong) xt_trace_clock(); + break; + case XT_STAT_COMMITS: + stat_value = stats->st_commits; + break; + case XT_STAT_ROLLBACKS: + stat_value = stats->st_rollbacks; + break; + case XT_STAT_STAT_READS: + stat_value = stats->st_stat_read; + break; + case XT_STAT_STAT_WRITES: + stat_value = stats->st_stat_write; + break; + + case XT_STAT_REC_BYTES_IN: + stat_value = stats->st_rec.ts_read; + break; + case XT_STAT_REC_BYTES_OUT: + stat_value = stats->st_rec.ts_write; + break; + case XT_STAT_REC_SYNC_COUNT: + stat_value = stats->st_rec.ts_flush; + break; + case XT_STAT_REC_SYNC_TIME: + stat_value = stats->st_rec.ts_flush_time; + break; + case XT_STAT_REC_CACHE_HIT: + stat_value = stats->st_rec_cache_hit; + break; + case XT_STAT_REC_CACHE_MISS: + stat_value = stats->st_rec_cache_miss; + break; + case XT_STAT_REC_CACHE_FREES: + stat_value = stats->st_rec_cache_frees; + break; + case XT_STAT_REC_CACHE_USAGE: + stat_value = (u_llong) xt_tc_get_usage(); + break; + + case XT_STAT_IND_BYTES_IN: + stat_value = stats->st_ind.ts_read; + break; + case XT_STAT_IND_BYTES_OUT: + stat_value = stats->st_ind.ts_write; + break; + case XT_STAT_IND_SYNC_COUNT: + stat_value = stats->st_ind.ts_flush; + break; + case XT_STAT_IND_SYNC_TIME: + stat_value = stats->st_ind.ts_flush_time; + break; + case XT_STAT_IND_CACHE_HIT: + stat_value = stats->st_ind_cache_hit; + break; + case XT_STAT_IND_CACHE_MISS: + stat_value = stats->st_ind_cache_miss; + break; + case XT_STAT_IND_CACHE_USAGE: + stat_value = (u_llong) xt_ind_get_usage(); + break; + case XT_STAT_ILOG_BYTES_IN: + stat_value = stats->st_ilog.ts_read; + break; + case XT_STAT_ILOG_BYTES_OUT: + stat_value = stats->st_ilog.ts_write; + break; + case XT_STAT_ILOG_SYNC_COUNT: + stat_value = stats->st_ilog.ts_flush; + break; + case XT_STAT_ILOG_SYNC_TIME: + stat_value = stats->st_ilog.ts_flush_time; + break; + + case XT_STAT_XLOG_BYTES_IN: + stat_value = stats->st_xlog.ts_read; + break; + case XT_STAT_XLOG_BYTES_OUT: + stat_value = stats->st_xlog.ts_write; + break; + case XT_STAT_XLOG_SYNC_COUNT: + stat_value = stats->st_xlog.ts_flush; + break; + case XT_STAT_XLOG_SYNC_TIME: + stat_value = stats->st_xlog.ts_flush_time; + break; + case XT_STAT_XLOG_CACHE_HIT: + stat_value = stats->st_xlog_cache_hit; + break; + case XT_STAT_XLOG_CACHE_MISS: + stat_value = stats->st_xlog_cache_miss; + break; + case XT_STAT_XLOG_CACHE_USAGE: + stat_value = (u_llong) xt_xlog_get_usage(); + break; + + case XT_STAT_DATA_BYTES_IN: + stat_value = stats->st_data.ts_read; + break; + case XT_STAT_DATA_BYTES_OUT: + stat_value = stats->st_data.ts_write; + break; + case XT_STAT_DATA_SYNC_COUNT: + stat_value = stats->st_data.ts_flush; + break; + case XT_STAT_DATA_SYNC_TIME: + stat_value = stats->st_data.ts_flush_time; + break; + + case XT_STAT_BYTES_TO_CHKPNT: + stat_value = db ? xt_bytes_since_last_checkpoint(db, db->db_xlog.xl_write_log_id, db->db_xlog.xl_write_log_offset) : 0; + break; + case XT_STAT_LOG_BYTES_TO_WRITE: + stat_value = db ? db->db_xlog.xl_log_bytes_written - db->db_xlog.xl_log_bytes_read : 0;//db->db_xlog.xlog_bytes_to_write(); + break; + case XT_STAT_BYTES_TO_SWEEP: + /* This stat is potentially very expensive: */ + stat_value = db ? xt_xn_bytes_to_sweep(db, xt_get_self()) : 0; + break; + case XT_STAT_WAIT_FOR_XACT: + stat_value = stats->st_wait_for_xact; + break; + case XT_STAT_XACT_TO_CLEAN: + stat_value = db ? db->db_xn_curr_id + 1 - db->db_xn_to_clean_id : 0; + break; + case XT_STAT_SWEEPER_WAITS: + stat_value = db ? db->db_stat_sweep_waits : 0; + break; + + case XT_STAT_SCAN_INDEX: + stat_value = stats->st_scan_index; + break; + case XT_STAT_SCAN_TABLE: + stat_value = stats->st_scan_table; + break; + case XT_STAT_ROW_SELECT: + stat_value = stats->st_row_select; + break; + case XT_STAT_ROW_INSERT: + stat_value = stats->st_row_insert; + break; + case XT_STAT_ROW_UPDATE: + stat_value = stats->st_row_update; + break; + case XT_STAT_ROW_DELETE: + stat_value = stats->st_row_delete; + break; + + case XT_STAT_RETRY_INDEX_SCAN: + stat_value = stats->st_retry_index_scan; + break; + case XT_STAT_REREAD_REC_LIST: + stat_value = stats->st_reread_record_list; + break; + default: + stat_value = 0; + break; + } + return stat_value; +} |