diff options
author | Sergei Golubchik <sergii@pisem.net> | 2010-06-01 21:52:20 +0200 |
---|---|---|
committer | Sergei Golubchik <sergii@pisem.net> | 2010-06-01 21:52:20 +0200 |
commit | ffc8f62b08982cc1f2fabf8b4b38bd124c115a97 (patch) | |
tree | da42637b1ae9402a7436b715f9c0db57cdddc87a /storage/pbxt | |
parent | 6b157f6be3cb056a93eb925df3880098c871b32a (diff) | |
parent | 0fc39acb8125fae95062e7b680b022b075a308c3 (diff) | |
download | mariadb-git-ffc8f62b08982cc1f2fabf8b4b38bd124c115a97.tar.gz |
merge 5.1->5.2
Diffstat (limited to 'storage/pbxt')
39 files changed, 2345 insertions, 460 deletions
diff --git a/storage/pbxt/ChangeLog b/storage/pbxt/ChangeLog index 2f7297d64ff..b6023d26139 100644 --- a/storage/pbxt/ChangeLog +++ b/storage/pbxt/ChangeLog @@ -1,6 +1,111 @@ PBXT Release Notes ================== ++------- 1.0.11 Pre-GA - 2010-05-11 + +RN322: Creating a table the references a non-existing table can now only be done if you set: foreign_key_checks = 0. Also fixed a failure when creating tables with recursive foreign key declarations. + +RN321: Added "Extended record count" to the CHECK TABLE output. This indicates the number of records that have a data log component. + +RN320: All tests now run with MySQL 5.1.46. + +------- 1.0.10n RC4 - 2010-04-28 + +RN319: Fix RN1/3 and RN1/4 back-ported from 1.1: Fixed a deadlock that could occur during low index cache situations and added some checks for index corruption, and added the try lock variation for R/W locks. + +RN318: Fixed a bug in the atomic R/W lock. This bug occurred on multi-core Linux when under extrem load. The affect was that an index lookup could fail. The index was not corrupted. + +------- 1.0.10m RC4 - 2010-03-29 + +RN317: This change prevents a unscheduled checkpoint from occurring when the sweeper has work to do. Checkpoint required due to the Checkpoint threshold reached are done as usual. + +------- 1.0.10k RC4 - 2010-03-29 + +RN316: Set the maximum delay, while waiting for previous transactions to commit to 1/100s. This situation occurs when cleanup begins of a long running transaction. + +RN315: Fixed a bug that could lead to a data log error, for example: Data log not found: '.../dlog-129602.xt'. This error occurred after a duplicate key error, dending on the table structure, because the row buffer was not restored after writing an extended record. + +RN314: Server startup time could be very long when data logs become large because the log size was not save in the header when a data log is full. + +------- 1.0.10j RC4 - 2010-03-24 + +RN313: Fixed an error in the calculation of the handle data record (.xtd files) size when AVG_ROW_LENGTH is set explicitly to a value less than 12. For example: + +CREATE TABLE objs ( + id int(10) unsigned NOT NULL, + objdata mediumblob NOT NULL, + PRIMARY KEY (id) +) ENGINE=PBXT AVG_ROW_LENGTH=10 + +This table definition previously lead to corruption of the table because the handle data record was set to 24 (14+10), which is less than the minimum (for variable length records) handle data record size of 26. + +This minimum consists of 14 byte record header and 12 bytes reference to the extended record data (the part of the record in the data log). + +Tip: when setting AVG_ROW_LENGTH you should normally add 12 to the average row length estimate to ensure that the average length part of the record is always in the handle data file. This is important, for example if you wish to make sure that the rows used to build indexes are in the handle data file. CHECK TABLE tells you how many rows are in the "fixed length part" of the record (output in MySQL error log). In the example above, this would be AVG_ROW_LENGTH=17. + +The maximum size of a field can be calculated adding the maximum byte size as described here: http://dev.mysql.com/doc/refman/5.1/en/storage-requirements.html, and then add the following values, depending on the byte size: + +byte size <= 240, add 1 +byte size < 2^16 (65536), add 3 +byte size < 2^24 (16777216), add 4 +byte size > 2^24, add 5 + +------- 1.0.10i RC4 - 2010-03-17 + +RN312: Fixed bug #534361: Valgrind error: write of uninitialised bytes in xt_flush_indices() + +RN311: Fixed ilog corruption when running out of disk space during an index flush operation, which lead to corruption of the index. + +------- 1.0.10h RC4 - 2010-02-25 + +RN310: Fixed Windows atomic INC/DEC operations, which lead to atomic R/W lock not working correctly. The result was that some index entries were not foound. + +RN309: Fixed a bug that caused a crash when the index was corrupted. The crash occurs if the index page in not completely written, and an item in the index has a bad length. + +RN308: Fixed bug #509803: can't run tpcc (cannot compare FKs that rely on indexes of different length). + +------- 1.0.10g RC4 - 2010-02-11 + +RN307: 2010-02-15: Set the internal version number 1.0.10g. + +RN306: All tests now run with MySQL 5.1.42. + +RN305: Fixed a bug that could cause a crash in filesort. The problem was that the return row estimate was incorrect, which caused the result of estimate_rows_upper_bound() to overflow to zero. Row estimate has been changed, and no longer takes into account deleted rows (so the row estimate is now a maximum). + +RN304: Fixed bug #513012: On a table with a trigger the same record is updated more than once in one statement + +------- 1.0.10f RC4 - 2010-01-29 + +RN303: Fix RN1/10 back-ported from 1.1: Fixed a bug in the record cache that caused PBXT to think it had run out of cache memory. The effect was that PBXT used less and less cache over time. The bug occurs during heavy concurrent access on the record cache. The affect is the PBXT gets slower and slower. + +RN302: Fix RN1/11 back-ported from 1.1: Corrected a problem that sometimes caused a pause in activity when the record cache was full. + +------- 1.0.10e RC4 - 2010-01-25 + +RN301: Fixed index statistics calculation. This bug lead to the wrong indices being selected by the optimizer because all indices returned the same cost. + +RN300: Fixed bug #509968: START TRANSACTION WITH CONSISTENT SNAPSHOT breaks transactional flow. + +RN299: Fixed bug #509218: Server asserts with Assertion `mutex->__data.__owner == 0' failed on high concurrency OLTP test. + +------- 1.0.10d RC4 - 2010-01-11 + +RN298: Fixed a bug that caused huge amounts of transaction log to be written when pbxt_flush_log_at_trx_commit = 2. + +------- 1.0.10c RC4 - 2009-12-29 + +RN297: Updated "LOCK TABLES ... READ LOCAL" behavior to be more restrictive and compatible with InnoDB + +RN296: Fixed bug #499026: START TRANSACTION WITH CONSISTENT SNAPSHOT does not work for PBXT + +------- 1.0.10 RC4 - 2009-12-18 + +RN295: PBXT tests now all run with MySQL 5.1.41. + +RN294: Fixed bug #483714: a broken table can prevent other tables from opening + +RN293: Added system variable pbxt_flush_log_at_trx_commit. The value of this variable determines whether the transaction log is written and/or flushed when a transaction is ended. A value of 0 means don't write or flush the transaction log, 1 means write and flush and 2 means write, but do not flush. No matter what the setting is choosen, the transaction log is written and flushed at least once per second. + ------- 1.0.09g RC3 - 2009-12-16 RN292: Fixed a bug that resulted in 2-phase commit not being used between PBXT and the binlog. This bug was a result of a hack which as added to solve a problem in an pre-release version of MySQL 5.1. The hack was removed. diff --git a/storage/pbxt/Makefile.am b/storage/pbxt/Makefile.am index 4ef1eab50c2..371756c84be 100644 --- a/storage/pbxt/Makefile.am +++ b/storage/pbxt/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = src +SUBDIRS = src bin EXTRA_DIST = CMakeLists.txt plug.in diff --git a/storage/pbxt/bin/Makefile.am b/storage/pbxt/bin/Makefile.am new file mode 100644 index 00000000000..ab7b711a6f1 --- /dev/null +++ b/storage/pbxt/bin/Makefile.am @@ -0,0 +1,14 @@ +# Used to build Makefile.in + +INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include \ + -I$(top_srcdir)/regex \ + -I$(top_srcdir)/storage/innobase/include \ + -I$(top_srcdir)/sql \ + -I$(srcdir) \ + -I$(srcdir)/../src + +bin_PROGRAMS = xtstat + +xtstat_SOURCES = xtstat_xt.cc ../src/strutil_xt.cc + +xtstat_LDADD = $(top_builddir)/libmysql/libmysqlclient.la diff --git a/storage/pbxt/bin/xtstat_xt.cc b/storage/pbxt/bin/xtstat_xt.cc new file mode 100644 index 00000000000..93b3d42e3f6 --- /dev/null +++ b/storage/pbxt/bin/xtstat_xt.cc @@ -0,0 +1,819 @@ +/* Copyright (c) 2005 PrimeBase Technologies GmbH + * + * PrimeBase XT + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * 2008-11-19 Paul McCullagh + * + * H&G2JCtL + */ + +#include "xt_config.h" + +#include <mysql.h> +#include <stdio.h> +#include <stdlib.h> +#include <ctype.h> +#include <string.h> + +#include "strutil_xt.h" +#include "util_xt.h" + +//#define DEBUG_INTERRUPT + +#define OPT_NONE -1 +#define OPT_HELP 0 +#define OPT_HOST 1 +#define OPT_USER 2 +#define OPT_PASSWORD 3 +#define OPT_DATABASE 4 +#define OPT_PORT 5 +#define OPT_SOCKET 6 +#define OPT_DELAY 7 +#define OPT_PROTOCOL 8 +#define OPT_DISPLAY 9 + +#define OPT_HAS_VALUE 1 +#define OPT_OPTIONAL 2 +#define OPT_INTEGER 4 + +llong record_cache_size; +llong index_cache_size; +llong log_cache_size; + +llong accumulative_values[XT_STAT_CURRENT_MAX]; +int columns_used; +int use_i_s = 0; + +struct DisplayOrder { + int do_statistic; + bool do_combo; +} display_order[XT_STAT_CURRENT_MAX]; + +struct Options { + int opt_id; + const char opt_char; + const char *opt_name; + int opt_flags; + const char *opt_desc; + const char *opt_value_str; + int opt_value_int; + bool opt_value_bool; +} options[] = { + { OPT_HELP, '?', "help", 0, + "Prints help text", NULL, 0, false }, + { OPT_HOST, 'h', "host", OPT_HAS_VALUE, + "Connect to host", NULL, 0, false }, + { OPT_USER, 'u', "user", OPT_HAS_VALUE, + "User for login if not current user", NULL, 0, false }, + { OPT_PASSWORD, 'p', "password", OPT_HAS_VALUE | OPT_OPTIONAL, + "Password to use when connecting to server. If password is not given it's asked from the tty", NULL, 0, false }, + { OPT_DATABASE, 'd', "database", OPT_HAS_VALUE, + "Database to be used (pbxt or information_schema required), default is information_schema", "information_schema", 0, false }, + { OPT_PORT, 'P', "port", OPT_HAS_VALUE | OPT_INTEGER, + "Port number to use for connection", NULL, 3306, false }, + { OPT_SOCKET, 'S', "socket", OPT_HAS_VALUE, + "Socket file to use for connection", NULL, 0, false }, + { OPT_DELAY, 'D', "delay", OPT_HAS_VALUE | OPT_INTEGER, + "Delay in seconds between polls of the database", NULL, 1, false }, + { OPT_PROTOCOL, 0, "protocol", OPT_HAS_VALUE, + "Connection protocol to use: default/tcp/socket/pipe/memory", "default", MYSQL_PROTOCOL_DEFAULT, false }, + { OPT_DISPLAY, 0, "display", OPT_HAS_VALUE, + "Columns to display: use short names separated by |, partial match allowed", "time-msec,commt,row-ins,rec,ind,ilog,xlog,data,to,dirty", 0, false }, + { OPT_NONE, 0, NULL, 0, NULL, NULL, 0, false } +}; + +#ifdef XT_WIN +#define atoll _atoi64 +#endif + +void add_statistic(int stat) +{ + /* Check if column has already been added: */ + for (int i=0; i<columns_used; i++) { + if (display_order[i].do_statistic == stat) + return; + } + display_order[columns_used].do_statistic = stat; + display_order[columns_used].do_combo = false; + columns_used++; +} + +void determine_display_order() +{ + const char *cols = options[OPT_DISPLAY].opt_value_str; + char column_1[21], column_2[21]; + int i; + bool add, added, add_combo; + XTStatMetaDataPtr meta, meta2; + + if (strcmp(cols, "all") == 0) + cols = "time,xact,stat,rec,ind,ilog,xlog,data,to,sweep,scan,row"; + columns_used = 0; + while (*cols) { + i = 0; + while (*cols && *cols != '-' && *cols != ',') { + if (i < 20) { + column_1[i] = *cols; + i++; + } + cols++; + } + column_1[i] = 0; + + i = 0; + if (*cols == '-') { + cols++; + while (*cols && *cols != '-' && *cols != ',') { + if (i < 20) { + column_2[i] = *cols; + i++; + } + cols++; + } + } + column_2[i] = 0; + + if (*cols == ',') + cols++; + + if (strcmp(column_1, "ms") == 0) + strcpy(column_1, "msec"); + if (strcmp(column_2, "ms") == 0) + strcpy(column_2, "msec"); + add_combo = false; + if (strcmp(column_1, "syncs/ms") == 0) { + strcpy(column_1, "syncs"); + add_combo = true; + } + if (strcmp(column_2, "syncs/ms") == 0) { + strcpy(column_2, "syncs"); + add_combo = true; + } + + added = false; + for (i=0; i<XT_STAT_MAXIMUM; i++) { + meta = xt_get_stat_meta_data(i); + add = false; + if (strcmp(meta->sm_short_line_1, column_1) == 0) { + if (column_2[0]) { + if (strcmp(meta->sm_short_line_2, column_2) == 0) + add = true; + } + else { + if (i != XT_STAT_XLOG_CACHE_USAGE) + add = true; + } + } + else if (!column_2[0]) { + if (strcmp(meta->sm_short_line_2, column_1) == 0) { + /* XT_STAT_XLOG_CACHE_USAGE is ignored, unless explicity listed! */ + if (i != XT_STAT_XLOG_CACHE_USAGE) + add = true; + } + } + if (add) { + added = true; + add_statistic(i); + if (add_combo) + add_statistic(i+1); + } + } + if (!added) { + if (column_2[0]) + fprintf(stderr, "ERROR: No statistic matches display option: '%s-%s'\n", column_1, column_2); + else + fprintf(stderr, "ERROR: No statistic matches display option: '%s'\n", column_1); + fprintf(stderr, "Display options: %s\n", options[OPT_DISPLAY].opt_value_str); + exit(1); + } + } + + /* Setup "combo" fields: */ + for (i=0; i<columns_used; i++) { + meta = xt_get_stat_meta_data(display_order[i].do_statistic); + if (meta->sm_flags & XT_STAT_COMBO_FIELD) { + if (i+1 < columns_used) { + meta2 = xt_get_stat_meta_data(display_order[i+1].do_statistic); + if (meta2->sm_flags & XT_STAT_COMBO_FIELD_2) { + if (strcmp(meta->sm_short_line_1, meta2->sm_short_line_1) == 0) + display_order[i].do_combo = true; + } + } + } + } +} + +void format_percent_value(char *buffer, double value, double perc) +{ + value = value * (double) 100 / (double) perc; + if (value >= 100) + sprintf(buffer, "%.0f", value); + else + sprintf(buffer, "%.1f", value); + buffer[4] = 0; + if (buffer[3] == '.') + buffer[3] = 0; +} + +#define XT_1_K ((double) 1024) +#define XT_1_M ((double) 1024 * (double) 1024) +#define XT_1_G ((double) 1024 * (double) 1024 * (double) 1024) +#define XT_1_T ((double) 1024 * (double) 1024 * (double) 1024 * (double) 1024) +#define XT_10000_K ((double) 10000 * XT_1_K) +#define XT_10000_M ((double) 10000 * XT_1_M) +#define XT_10000_G ((double) 10000 * XT_1_G) + +void format_byte_value(char *buffer, double value) +{ + double dval; + char string[100]; + char ch; + + if (value < (double) 100000) { + /* byte value from 0 to 99999: */ + sprintf(buffer, "%.0f", value); + return; + } + + if (value < XT_10000_K) { + dval = value / XT_1_K; + ch = 'K'; + } + else if (value < XT_10000_M) { + dval = value / XT_1_M; + ch = 'M'; + } + else if (value < XT_10000_G) { + dval = value / XT_1_G; + ch = 'G'; + } + else { + dval = value / XT_1_T; + ch = 'T'; + } + + if (dval < (double) 10.0) + sprintf(string, "%.2f", dval); + else if (dval < (double) 100.0) + sprintf(string, "%.1f", dval); + else + sprintf(string, "%.0f", dval); + if (string[3] == '.') + string[3] = 0; + else + string[4] = 0; + sprintf(buffer, "%s%c", string, ch); +} + +/* + * Uses: + * t = thousands + * m = millions + * b = billions + */ +void format_mini_count_value(char *buffer, double value) +{ + double dval; + char string[100]; + char ch; + + if (value < (double) 100) { + /* Value from 0 to 99: */ + sprintf(buffer, "%.0f", value); + return; + } + + if (value < (double) 1000) { + sprintf(buffer, "<t"); + return; + } + + if (value < (double) 10000) { + /* Value is less than 1m */ + dval = value / (double) 1000.0; + ch = 't'; + } + else if (value < (double) 1000000) { + sprintf(buffer, "<m"); + return; + } + else if (value < (double) 10000000) { + /* Value is less than 1b */ + dval = value / (double) 1000000.0; + ch = 'm'; + } + else if (value < (double) 1000000000) { + sprintf(buffer, "<b"); + return; + } + else { + /* Value is greater than 1 billion */ + dval = value / (double) 1000000000.0; + ch = 'b'; + } + + sprintf(string, "%1.0f", dval); + string[1] = 0; + sprintf(buffer, "%s%c", string, ch); +} + +#define XT_1_THOUSAND ((double) 1000) +#define XT_1_MILLION ((double) 1000 * (double) 1000) +#define XT_1_BILLION ((double) 1000 * (double) 1000 * (double) 1000) +#define XT_1_TRILLION ((double) 1000 * (double) 1000 * (double) 1000 * (double) 1000) +#define XT_10_THOUSAND ((double) 10 * (double) 1000) +#define XT_10_MILLION ((double) 10 * (double) 1000 * (double) 1000) +#define XT_10_BILLION ((double) 10 * (double) 1000 * (double) 1000 * (double) 1000) +#define XT_10_TRILLION ((double) 10 * (double) 1000 * (double) 1000 * (double) 1000 * (double) 1000) + +void format_count_value(char *buffer, double value) +{ + double dval; + char string[100]; + char ch; + + if (value < (double) 0) { + strcpy(buffer, "0"); + return; + } + + if (value < XT_10_THOUSAND) { + /* byte value from 0 to 99999: */ + sprintf(buffer, "%.0f", value); + return; + } + + if (value < XT_10_MILLION) { + /* Value is less than 10 million */ + dval = value / XT_1_THOUSAND; + ch = 't'; + } + else if (value < XT_10_BILLION) { + /* Value is less than 10 million */ + dval = value / XT_1_MILLION; + ch = 'm'; + } + else if (value < XT_10_TRILLION) { + /* Value is less than 10 trillion */ + dval = value / XT_1_BILLION; + ch = 'b'; + } + else { + dval = value / XT_1_TRILLION; + ch = 't'; + } + + if (dval < (double) 10.0) + sprintf(string, "%.2f", dval); + else if (dval < (double) 100.0) + sprintf(string, "%.1f", dval); + else + sprintf(string, "%.0f", dval); + if (string[3] == '.') + string[3] = 0; + else + string[4] = 0; + sprintf(buffer, "%s%c", string, ch); +} + +void print_help() +{ + struct Options *opt; + char command[100]; + + printf("Usage: xtstat [ options ]\n"); + printf("e.g. xtstat -D10 : Poll every 10 seconds\n"); + opt = options; + printf("Options :-\n"); + while (opt->opt_id != OPT_NONE) { + strcpy(command, opt->opt_name); + if (opt->opt_flags & OPT_HAS_VALUE) { + if (opt->opt_flags & OPT_OPTIONAL) + strcat(command, "[=value]"); + else + strcat(command, "=value"); + } + if (opt->opt_char) + printf("-%c, --%-16s %s.\n", opt->opt_char, command, opt->opt_desc); + else + printf(" --%-16s %s.\n", command, opt->opt_desc); + opt++; + } +} + +void print_stat_key() +{ + printf("Key :-\n"); + printf("K = Kilobytes (1,024 bytes)\n"); + printf("M = Megabytes (1,048,576 bytes)\n"); + printf("G = Gigabytes (1,073,741,024 bytes)\n"); + printf("T = Terabytes (1,099,511,627,776 bytes)\n"); + printf("t = thousands (1,000s)\n"); + printf("m = millions (1,000,000s)\n"); + printf("b = billions (1,000,000,000s)\n"); +} + +void print_stat_info() +{ + XTStatMetaDataPtr meta; + char buffer[40]; + char desc[400]; + + printf("Statistics :-\n"); + for (int i=0; i<XT_STAT_CURRENT_MAX; i++) { + meta = xt_get_stat_meta_data(i); + sprintf(desc, meta->sm_description, "milli"); + sprintf(buffer, "%s-%s", meta->sm_short_line_1, meta->sm_short_line_2); + if (meta->sm_flags & XT_STAT_COMBO_FIELD) { + /* Combine next 2 fields: */ + i++; + strcat(buffer, "/ms"); + strcat(desc, "/time taken in milliseconds"); + } + printf("%-13s %-21s - %s.\n", buffer, meta->sm_name, desc); + } +} + +bool match_arg(char *what, const char *opt, char **value) +{ + while (*what && *opt && isalpha(*what)) { + if (*what != *opt) + return false; + what++; + opt++; + } + if (*opt) + return false; + if (*what == '=') + *value = what + 1; + else if (*what) + return false; + else + *value = NULL; + return true; +} + +void parse_args(int argc, char **argv) +{ + char *ptr; + char *value; + int i = 1; + struct Options *opt; + bool found; + + while (i < argc) { + ptr = argv[i]; + found = false; + if (*ptr == '-') { + ptr++; + if (*ptr == '-') { + ptr++; + opt = options; + while (opt->opt_id != OPT_NONE) { + if (match_arg(ptr, opt->opt_name, &value)) { + found = true; + opt->opt_value_str = value; + opt->opt_value_bool = true; + break; + } + opt++; + } + } + else { + opt = options; + while (opt->opt_id != OPT_NONE) { + if (*ptr == opt->opt_char) { + ptr++; + if (*ptr) + opt->opt_value_str = ptr; + else { + opt->opt_value_str = NULL; + if (i+1 < argc) { + ptr = argv[i+1]; + if (*ptr != '-') { + opt->opt_value_str = ptr; + i++; + } + } + } + found = true; + opt->opt_value_bool = true; + break; + } + opt++; + } + } + } + + if (!found) { + fprintf(stderr, "Unknown option: %s\n", argv[i]); + print_help(); + exit(1); + } + + if (opt->opt_flags & OPT_HAS_VALUE) { + if (!(opt->opt_flags & OPT_OPTIONAL)) { + if (!opt->opt_value_str) { + fprintf(stderr, "Option requires a value: %s\n", argv[i]); + printf("Use --help for help on commands and usage\n"); + exit(1); + } + } + } + else { + if (opt->opt_value_str) { + fprintf(stderr, "Option does not accept a value: %s\n", argv[i]); + printf("Use --help for help on commands and usage\n"); + exit(1); + } + } + + if (opt->opt_value_str && (opt->opt_flags & OPT_INTEGER)) + opt->opt_value_int = atoi(opt->opt_value_str); + + if (opt->opt_id == OPT_HELP) { + print_help(); + print_stat_key(); + print_stat_info(); + exit(1); + } + + i++; + } +} + +#ifdef DEBUG_INTERRUPT +void interrupt_pbxt(MYSQL *conn) +{ + MYSQL_RES *res; + + if (mysql_query(conn, "show engine pbxt status")) { + fprintf(stderr, "%s\n", mysql_error(conn)); + exit(1); + } + + res = mysql_use_result(conn); + mysql_free_result(res); +} +#endif + +static bool display_parameters(MYSQL *conn) +{ + MYSQL_RES *res; + MYSQL_ROW row; + + /* send SQL query */ + if (mysql_query(conn, "show variables like 'pbxt_%'")) + return false; + + if (!(res = mysql_use_result(conn))) + return false; + + /* output table name */ + printf("-- PBXT System Variables --\n"); + while ((row = mysql_fetch_row(res)) != NULL) { + if (strcmp(row[0], "pbxt_index_cache_size") == 0) + index_cache_size = xt_byte_size_to_int8(row[1]); + else if (strcmp(row[0], "pbxt_record_cache_size") == 0) + record_cache_size = xt_byte_size_to_int8(row[1]); + else if (strcmp(row[0], "pbxt_log_cache_size") == 0) + log_cache_size = xt_byte_size_to_int8(row[1]); + printf("%-29s= %s\n", row[0], row[1]); + } + + mysql_free_result(res); + + for (int i=0; i<XT_STAT_CURRENT_MAX; i++) + accumulative_values[i] = 0; + + printf("Display options: %s\n", options[OPT_DISPLAY].opt_value_str); + return true; +} + +static bool connect(MYSQL *conn) +{ + unsigned int type; + + if (strcasecmp(options[OPT_PROTOCOL].opt_value_str, "tcp") == 0) + type = MYSQL_PROTOCOL_TCP; + else if (strcasecmp(options[OPT_PROTOCOL].opt_value_str, "socket") == 0) + type = MYSQL_PROTOCOL_SOCKET; + else if (strcasecmp(options[OPT_PROTOCOL].opt_value_str, "pipe") == 0) + type = MYSQL_PROTOCOL_PIPE; + else if (strcasecmp(options[OPT_PROTOCOL].opt_value_str, "memory") == 0) + type = MYSQL_PROTOCOL_MEMORY; + else + type = MYSQL_PROTOCOL_DEFAULT; + + if (mysql_options(conn, MYSQL_OPT_PROTOCOL, (char *) &type)) + return false; + + if (mysql_options(conn, MYSQL_READ_DEFAULT_GROUP, "xtstat")) + return false; + + if (strcasecmp(options[OPT_DATABASE].opt_value_str, "pbxt") == 0) + use_i_s = FALSE; + else if (strcasecmp(options[OPT_DATABASE].opt_value_str, "information_schema") == 0) + use_i_s = TRUE; + else + use_i_s = TRUE; + + /* Connect to database */ + if (!mysql_real_connect(conn, + options[OPT_HOST].opt_value_str, + options[OPT_USER].opt_value_str, + options[OPT_PASSWORD].opt_value_str, + options[OPT_DATABASE].opt_value_str, + options[OPT_PORT].opt_value_int, + options[OPT_SOCKET].opt_value_str, + 0)) + return false; + + return true; +} + +int main(int argc, char **argv) +{ + MYSQL *conn; + MYSQL_RES *res; + MYSQL_ROW row; + llong current_values[XT_STAT_CURRENT_MAX]; + double value; + char str_value[100]; + XTStatMetaDataPtr meta; + int len; + int stat; + int err; + bool select_worked = true; + + xt_set_time_unit("msec"); + parse_args(argc, argv); + + determine_display_order(); + + if (!(conn = mysql_init(NULL))) { + fprintf(stderr, "Insufficient memory\n"); + exit(1); + } + + if (!connect(conn) || !display_parameters(conn)) { + fprintf(stderr, "%s\n", mysql_error(conn)); + exit(1); + } + + retry: + for (int loop = 0; ; loop++) { + if (use_i_s) + err = mysql_query(conn, "select id, Value from information_schema.pbxt_statistics order by ID"); + else + err = mysql_query(conn, "select id, Value from pbxt.statistics order by ID"); + if (err) + goto reconnect; + + if (!(res = mysql_use_result(conn))) + goto reconnect; + select_worked = true; + + while ((row = mysql_fetch_row(res)) != NULL) { + stat = atoi(row[0])-1; + current_values[stat] = atoll(row[1]); + } + mysql_free_result(res); + +#ifdef DEBUG_INTERRUPT + if (current_values[XT_STAT_STAT_WRITES] - accumulative_values[XT_STAT_STAT_WRITES] == 0 && + current_values[XT_STAT_REC_SYNC_TIME] - accumulative_values[XT_STAT_REC_SYNC_TIME] == 0 && + current_values[XT_STAT_IND_SYNC_TIME] - accumulative_values[XT_STAT_IND_SYNC_TIME] == 0) + interrupt_pbxt(); +#endif + + if ((loop % 25) == 0) { + for (int column=0; column<columns_used; column++) { + len = 5; + meta = xt_get_stat_meta_data(display_order[column].do_statistic); + strcpy(str_value, meta->sm_short_line_1); + if (display_order[column].do_combo) { + /* Combine next 2 fields: */ + len = 8; + column++; + } + else if (meta->sm_flags & XT_STAT_PERCENTAGE) + len = 4; + else if (meta->sm_flags & XT_STAT_DATE) + len = 15; + printf("%*s ", len, str_value); + } + printf("\n"); + for (int column=0; column<columns_used; column++) { + len = 5; + meta = xt_get_stat_meta_data(display_order[column].do_statistic); + strcpy(str_value, meta->sm_short_line_2); + if (display_order[column].do_combo) { + /* Combine next 2 fields: */ + len = 8; + column++; + strcat(str_value, "/ms"); + } + else if (meta->sm_flags & XT_STAT_PERCENTAGE) + len = 4; + else if (meta->sm_flags & XT_STAT_DATE) + len = 15; + printf("%*s ", len, str_value); + } + printf("\n"); + } + + for (int column=0; column<columns_used; column++) { + len = 5; + stat = display_order[column].do_statistic; + meta = xt_get_stat_meta_data(stat); + if (meta->sm_flags & XT_STAT_ACCUMULATIVE) { + /* Take care of overflow! */ + if (current_values[stat] < accumulative_values[stat]) + value = (double) (0xFFFFFFFF - (accumulative_values[stat] - current_values[stat])); + else + value = (double) (current_values[stat] - accumulative_values[stat]); + } + else + value = (double) current_values[stat]; + accumulative_values[stat] = current_values[stat]; + if (meta->sm_flags & XT_STAT_TIME_VALUE) + value = value / (double) 1000; + if (display_order[column].do_combo) { + format_mini_count_value(str_value, value); + strcat(str_value, "/"); + column++; + stat = display_order[column].do_statistic; + value = (double) (current_values[stat] - accumulative_values[stat]); + accumulative_values[stat] = current_values[stat]; + value = value / (double) 1000; + format_count_value(&str_value[strlen(str_value)], value); + len = 8; + } + else if (meta->sm_flags & XT_STAT_PERCENTAGE) { + double perc = 100; + switch (stat) { + case XT_STAT_REC_CACHE_USAGE: perc = (double)record_cache_size; break; + case XT_STAT_IND_CACHE_USAGE: perc = (double)index_cache_size; break; + case XT_STAT_XLOG_CACHE_USAGE: perc = (double)log_cache_size; break; + } + format_percent_value(str_value, value, perc); + len = 4; + } + else if (meta->sm_flags & XT_STAT_DATE) { + time_t ticks = (time_t) value; + const struct tm *ltime = localtime(&ticks); + strftime(str_value, 99, "%y%m%d %H:%M:%S", ltime); + len = 15; + } + else if (meta->sm_flags & XT_STAT_BYTE_COUNT) + format_byte_value(str_value, value); + else + format_count_value(str_value, value); + if (column == columns_used-1) + printf("%*s\n", len, str_value); + else + printf("%*s ", len, str_value); + } + + sleep(options[OPT_DELAY].opt_value_int); + } + + /* close connection */ + mysql_close(conn); + return 0; + + reconnect: + /* Reconnect... */ + if (select_worked) { + /* Only print message if the SELECT worked. + * or we will get a screen full of messages: + */ + fprintf(stderr, "%s\n", mysql_error(conn)); + printf("Reconnecting...\n"); + } + mysql_close(conn); + if (!(conn = mysql_init(NULL))) { + fprintf(stderr, "Insufficient memory\n"); + exit(1); + } + do { + sleep(2); + } while (!connect(conn)); + select_worked = false; + goto retry; +} diff --git a/storage/pbxt/src/backup_xt.cc b/storage/pbxt/src/backup_xt.cc index da88c363822..b9631f2cfd5 100644 --- a/storage/pbxt/src/backup_xt.cc +++ b/storage/pbxt/src/backup_xt.cc @@ -287,7 +287,7 @@ result_t PBXTBackupDriver::get_data(Buffer &buf) bd_table_no++; try_(a) { xt_ha_open_database_of_table(self, (XTPathStrPtr) path); - tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL); + tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE); pushr_(xt_heap_release, tab); if (!(bd_ot = xt_db_open_table_using_tab(tab, bd_thread))) xt_throw(self); @@ -403,7 +403,7 @@ result_t PBXTBackupDriver::lock() bd_thread->st_abort_trans = FALSE; bd_thread->st_stat_ended = FALSE; bd_thread->st_stat_trans = FALSE; - bd_thread->st_is_update = FALSE; + bd_thread->st_is_update = NULL; if (!xt_xn_begin(bd_thread)) return backup::ERROR; bd_state = BUP_STATE_AFTER_LOCK; @@ -562,7 +562,7 @@ result_t PBXTRestoreDriver::send_data(Buffer &buf) m_tables[rd_table_no-1].internal_name(path, sizeof(path)); try_(a) { xt_ha_open_database_of_table(self, (XTPathStrPtr) path); - tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL); + tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE); pushr_(xt_heap_release, tab); if (!(rd_ot = xt_db_open_table_using_tab(tab, rd_thread))) xt_throw(self); diff --git a/storage/pbxt/src/cache_xt.cc b/storage/pbxt/src/cache_xt.cc index a8b2ff20ea2..85eea41dd79 100644 --- a/storage/pbxt/src/cache_xt.cc +++ b/storage/pbxt/src/cache_xt.cc @@ -90,7 +90,7 @@ #define IDX_CAC_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, &(i)->cs_lock) #define IDX_CAC_FREE_LOCK(s, i) xt_spinxslock_free(s, &(i)->cs_lock) #define IDX_CAC_READ_LOCK(i, s) xt_spinxslock_slock(&(i)->cs_lock, (s)->t_id) -#define IDX_CAC_WRITE_LOCK(i, s) xt_spinxslock_xlock(&(i)->cs_lock, (s)->t_id) +#define IDX_CAC_WRITE_LOCK(i, s) xt_spinxslock_xlock(&(i)->cs_lock, FALSE, (s)->t_id) #define IDX_CAC_UNLOCK(i, s) xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id) #endif @@ -178,6 +178,7 @@ static DcGlobalsRec ind_cac_globals; KEY_CACHE my_cache; #undef pthread_rwlock_rdlock #undef pthread_rwlock_wrlock +#undef pthread_rwlock_try_wrlock #undef pthread_rwlock_unlock #undef pthread_mutex_lock #undef pthread_mutex_unlock @@ -410,7 +411,7 @@ xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTT /* Because of the lock order, I have to release the * handle before I get a lock on the cache block. * - * But, by doing this, thie cache block may be gone! + * But, by doing this, this cache block may be gone! */ if (block) { IDX_CAC_READ_LOCK(seg, thread); @@ -420,6 +421,11 @@ xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTT /* Found the block... * {HANDLE-COUNT-SLOCK} * 04.05.2009, changed to slock. + * The xlock causes too much contention + * on the cache block for read only loads. + * + * Is it safe? + * See below... */ XT_IPAGE_READ_LOCK(&block->cb_lock); goto block_found; @@ -691,6 +697,9 @@ xtPublic void xt_ind_exit(XTThreadPtr self) } } + /* Must be done before freeing the blocks! */ + ind_handle_exit(self); + if (ind_cac_globals.cg_blocks) { xt_free(self, ind_cac_globals.cg_blocks); ind_cac_globals.cg_blocks = NULL; @@ -702,7 +711,6 @@ xtPublic void xt_ind_exit(XTThreadPtr self) ind_cac_globals.cg_buffer = NULL; } #endif - ind_handle_exit(self); memset(&ind_cac_globals, 0, sizeof(ind_cac_globals)); } @@ -882,7 +890,58 @@ static xtBool ind_free_block(XTOpenTablePtr ot, XTIndBlockPtr block) while (xblock) { if (block == xblock) { /* Found the block... */ - XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id); + /* It is possible that a thread enters this code holding a + * lock on a page. This can cause a deadlock: + * + * #0 0x91faa2ce in semaphore_wait_signal_trap + * #1 0x91fb1da5 in pthread_mutex_lock + * #2 0x00e2ec13 in xt_p_mutex_lock at pthread_xt.cc:544 + * #3 0x00e6c30a in xt_xsmutex_xlock at lock_xt.cc:1547 + * #4 0x00dee402 in ind_free_block at cache_xt.cc:879 + * #5 0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033 + * #6 0x00def8d1 in xt_ind_reserve at cache_xt.cc:1513 + * #7 0x00e22118 in xt_idx_insert at index_xt.cc:2047 + * #8 0x00e4d7ee in xt_tab_new_record at table_xt.cc:4702 + * #9 0x00e0ff0b in ha_pbxt::write_row at ha_pbxt.cc:2340 + * #10 0x0023a00f in handler::ha_write_row at handler.cc:4570 + * #11 0x001a32c8 in write_record at sql_insert.cc:1568 + * #12 0x001ab635 in mysql_insert at sql_insert.cc:812 + * #13 0x0010e068 in mysql_execute_command at sql_parse.cc:3066 + * #14 0x0011480d in mysql_parse at sql_parse.cc:5787 + * #15 0x00115afb in dispatch_command at sql_parse.cc:1200 + * #16 0x00116de2 in do_command at sql_parse.cc:857 + * #17 0x00101ee4 in handle_one_connection at sql_connect.cc:1115 + * #18 0x91fdb155 in _pthread_start + * #19 0x91fdb012 in thread_start + * + * #0 0x91fb146e in __semwait_signal + * #1 0x91fb12ef in nanosleep$UNIX2003 + * #2 0x91fb1236 in usleep$UNIX2003 + * #3 0x00e52112 in xt_yield at thread_xt.cc:1274 + * #4 0x00e6c0eb in xt_spinxslock_xlock at lock_xt.cc:1456 + * #5 0x00dee444 in ind_free_block at cache_xt.cc:886 + * #6 0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033 + * #7 0x00deeaf0 in ind_cac_fetch at cache_xt.cc:1130 + * #8 0x00def604 in xt_ind_fetch at cache_xt.cc:1386 + * #9 0x00e2159a in xt_idx_update_row_id at index_xt.cc:2489 + * #10 0x00e603c8 in xn_sw_clean_indices at xaction_xt.cc:1932 + * #11 0x00e606d4 in xn_sw_cleanup_variation at xaction_xt.cc:2056 + * #12 0x00e60e29 in xn_sw_cleanup_xact at xaction_xt.cc:2276 + * #13 0x00e615ed in xn_sw_main at xaction_xt.cc:2433 + * #14 0x00e61919 in xn_sw_run_thread at xaction_xt.cc:2564 + * #15 0x00e53f80 in thr_main at thread_xt.cc:1017 + * #16 0x91fdb155 in _pthread_start + * #17 0x91fdb012 in thread_start + * + * So we back off if a lock is held! + */ + if (!XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)) { + IDX_CAC_UNLOCK(seg, ot->ot_thread); +#ifdef DEBUG_CHECK_IND_CACHE + xt_ind_check_cache(NULL); +#endif + return FALSE; + } if (block->cb_state != IDX_CAC_BLOCK_CLEAN) { /* This block cannot be freeed: */ XT_IPAGE_UNLOCK(&block->cb_lock, TRUE); @@ -1376,6 +1435,7 @@ xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID ad register XTIndBlockPtr block; DcSegmentPtr seg; xtWord2 branch_size; + u_int rec_size; xtBool xlock = FALSE; #ifdef DEBUG @@ -1386,10 +1446,24 @@ xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID ad return FAILED; branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2); - if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE) { - IDX_CAC_UNLOCK(seg, ot->ot_thread); - xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name); - return FAILED; + rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size); + if (rec_size < 2 || rec_size > XT_INDEX_PAGE_SIZE) + goto failed_corrupt; + if (ind->mi_fix_key) { + rec_size -= 2; + if (XT_IS_NODE(branch_size)) { + if (rec_size != 0) { + if (rec_size < XT_NODE_REF_SIZE) + goto failed_corrupt; + rec_size -= XT_NODE_REF_SIZE; + if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE + XT_NODE_REF_SIZE)) != 0) + goto failed_corrupt; + } + } + else { + if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE)) != 0) + goto failed_corrupt; + } } switch (ltype) { @@ -1450,6 +1524,11 @@ xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID ad iref->ir_block = block; iref->ir_branch = (XTIdxBranchDPtr) block->cb_data; return OK; + + failed_corrupt: + IDX_CAC_UNLOCK(seg, ot->ot_thread); + xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name); + return FAILED; } xtPublic xtBool xt_ind_release(XTOpenTablePtr ot, XTIndexPtr ind, XTPageUnlockType XT_NDEBUG_UNUSED(utype), XTIndReferencePtr iref) diff --git a/storage/pbxt/src/cache_xt.h b/storage/pbxt/src/cache_xt.h index 5dd7de629e3..ca796ab1a74 100644 --- a/storage/pbxt/src/cache_xt.h +++ b/storage/pbxt/src/cache_xt.h @@ -58,7 +58,8 @@ struct XTIdxReadBuffer; #define XT_IPAGE_INIT_LOCK(s, i) xt_atomicrwlock_init_with_autoname(s, i) #define XT_IPAGE_FREE_LOCK(s, i) xt_atomicrwlock_free(s, i) #define XT_IPAGE_READ_LOCK(i) xt_atomicrwlock_slock(i) -#define XT_IPAGE_WRITE_LOCK(i, o) xt_atomicrwlock_xlock(i, o) +#define XT_IPAGE_WRITE_LOCK(i, o) xt_atomicrwlock_xlock(i, FALSE, o) +#define XT_IPAGE_WRITE_TRY_LOCK(i, o) xt_atomicrwlock_xlock(i, TRUE, o) #define XT_IPAGE_UNLOCK(i, x) xt_atomicrwlock_unlock(i, x) #elif defined(XT_IPAGE_USE_PTHREAD_RW) #define XT_IPAGE_LOCK_TYPE xt_rwlock_type @@ -66,20 +67,23 @@ struct XTIdxReadBuffer; #define XT_IPAGE_FREE_LOCK(s, i) xt_free_rwlock(i) #define XT_IPAGE_READ_LOCK(i) xt_slock_rwlock_ns(i) #define XT_IPAGE_WRITE_LOCK(i, s) xt_xlock_rwlock_ns(i) +#define XT_IPAGE_WRITE_TRY_LOCK(i, s) xt_xlock_try_rwlock_ns(i) #define XT_IPAGE_UNLOCK(i, x) xt_unlock_rwlock_ns(i) #elif defined(XT_IPAGE_USE_SPINXSLOCK) #define XT_IPAGE_LOCK_TYPE XTSpinXSLockRec #define XT_IPAGE_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, i) #define XT_IPAGE_FREE_LOCK(s, i) xt_spinxslock_free(s, i) #define XT_IPAGE_READ_LOCK(i) xt_spinxslock_slock(i) -#define XT_IPAGE_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, o) +#define XT_IPAGE_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, FALSE, o) +#define XT_IPAGE_WRITE_TRY_LOCK(i, o) xt_spinxslock_xlock(i, TRUE, o) #define XT_IPAGE_UNLOCK(i, x) xt_spinxslock_unlock(i, x) #else // XT_IPAGE_USE_SKEW_RW #define XT_IPAGE_LOCK_TYPE XTSkewRWLockRec #define XT_IPAGE_INIT_LOCK(s, i) xt_skewrwlock_init_with_autoname(s, i) #define XT_IPAGE_FREE_LOCK(s, i) xt_skewrwlock_free(s, i) #define XT_IPAGE_READ_LOCK(i) xt_skewrwlock_slock(i) -#define XT_IPAGE_WRITE_LOCK(i, o) xt_skewrwlock_xlock(i, o) +#define XT_IPAGE_WRITE_LOCK(i, o) xt_skewrwlock_xlock(i, FALSE, o) +#define XT_IPAGE_WRITE_TRY_LOCK(i, o) xt_skewrwlock_xlock(i, TRUE, o) #define XT_IPAGE_UNLOCK(i, x) xt_skewrwlock_unlock(i, x) #endif @@ -103,10 +107,10 @@ typedef struct XTIndBlock { struct XTIndBlock *cb_lr_used; /* Less recently used blocks. */ /* Protected by cb_lock: */ XT_IPAGE_LOCK_TYPE cb_lock; - xtWord1 cb_state; /* Block status. */ + xtWord4 cp_flush_seq; xtWord2 cb_handle_count; /* TRUE if this page is referenced by a handle. */ - xtWord2 cp_flush_seq; xtWord2 cp_del_count; /* Number of deleted entries. */ + xtWord1 cb_state; /* Block status. */ #ifdef XT_USE_DIRECT_IO_ON_INDEX xtWord1 *cb_data; #else diff --git a/storage/pbxt/src/database_xt.cc b/storage/pbxt/src/database_xt.cc index 288520dfed0..8d1b4e46da9 100644 --- a/storage/pbxt/src/database_xt.cc +++ b/storage/pbxt/src/database_xt.cc @@ -68,6 +68,7 @@ xtPublic int xt_db_log_file_count; xtPublic int xt_db_auto_increment_mode; /* 0 = MySQL compatible, 1 = PrimeBase Compatible. */ xtPublic int xt_db_offline_log_function; /* 0 = recycle logs, 1 = delete logs, 2 = keep logs */ xtPublic int xt_db_sweeper_priority; /* 0 = low (default), 1 = normal, 2 = high */ +xtPublic int xt_db_flush_log_at_trx_commit; /* 0 = no-write/no-flush, 1 = yes, 2 = write/no-flush */ xtPublic XTSortedListPtr xt_db_open_db_by_id = NULL; xtPublic XTHashTabPtr xt_db_open_databases = NULL; @@ -288,6 +289,7 @@ xtPublic void xt_stop_database_threads(XTThreadPtr self, xtBool sync) /* Wait for the checkpointer: */ xt_wait_for_checkpointer(self, db); } + xt_stop_flusher(self, db); xt_stop_checkpointer(self, db); xt_stop_writer(self, db); xt_stop_sweeper(self, db); @@ -317,6 +319,7 @@ static void db_finalize(XTThreadPtr self, void *x) { XTDatabaseHPtr db = (XTDatabaseHPtr) x; + xt_stop_flusher(self, db); xt_stop_checkpointer(self, db); xt_stop_compactor(self, db); xt_stop_sweeper(self, db); @@ -475,6 +478,8 @@ xtPublic XTDatabaseHPtr xt_get_database(XTThreadPtr self, char *path, xtBool mul xt_start_compactor(self, db); xt_start_writer(self, db); xt_start_checkpointer(self, db); + if (xt_db_flush_log_at_trx_commit == 0 || xt_db_flush_log_at_trx_commit == 2) + xt_start_flusher(self, db); popr_(); xt_ht_put(self, xt_db_open_databases, db); @@ -574,6 +579,7 @@ xtPublic void xt_drop_database(XTThreadPtr self, XTDatabaseHPtr db) pushr_(xt_ht_unlock, xt_db_open_databases); /* Shutdown the database daemons: */ + xt_stop_flusher(self, db); xt_stop_checkpointer(self, db); xt_stop_sweeper(self, db); xt_stop_compactor(self, db); @@ -902,7 +908,7 @@ xtPublic XTOpenTablePoolPtr xt_db_lock_table_pool_by_name(XTThreadPtr self, XTDa XTTableHPtr tab; xtTableID tab_id; - pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok, NULL)); + pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok)); if (!tab) { freer_(); // xt_heap_release(tab) return NULL; diff --git a/storage/pbxt/src/database_xt.h b/storage/pbxt/src/database_xt.h index 21dfdde6f5a..1b1863d2045 100644 --- a/storage/pbxt/src/database_xt.h +++ b/storage/pbxt/src/database_xt.h @@ -60,6 +60,7 @@ extern int xt_db_log_file_count; extern int xt_db_auto_increment_mode; extern int xt_db_offline_log_function; extern int xt_db_sweeper_priority; +extern int xt_db_flush_log_at_trx_commit; extern XTSortedListPtr xt_db_open_db_by_id; extern XTHashTabPtr xt_db_open_databases; @@ -187,6 +188,10 @@ typedef struct XTDatabase : public XTHeap { xt_mutex_type db_cp_lock; xt_cond_type db_cp_cond; /* Writer condition when idle (must bw woken by log flush! */ XTCheckPointStateRec db_cp_state; /* The checkpoint state. */ + + /* The "flusher" thread (used when pbxt_flush_log_at_trx_commit = 0 or 2) */ + struct XTThread *db_fl_thread; /* The flusher thread (flushes the transation log). */ + xt_mutex_type db_fl_lock; } XTDatabaseRec, *XTDatabaseHPtr; /* Heap pointer */ #define XT_FOR_USER 0 diff --git a/storage/pbxt/src/datadic_xt.cc b/storage/pbxt/src/datadic_xt.cc index a236408074d..075d28edabe 100644 --- a/storage/pbxt/src/datadic_xt.cc +++ b/storage/pbxt/src/datadic_xt.cc @@ -1142,8 +1142,8 @@ u_int XTParseTable::columnList(XTThreadPtr self, bool index_cols) void XTParseTable::parseReferenceDefinition(XTThreadPtr self, u_int req_cols) { - int on_delete = XT_KEY_ACTION_DEFAULT; - int on_update = XT_KEY_ACTION_DEFAULT; + int on_delete = XT_KEY_ACTION_RESTRICT; + int on_update = XT_KEY_ACTION_RESTRICT; char name[XT_IDENTIFIER_NAME_SIZE]; char parent_name[XT_IDENTIFIER_NAME_SIZE]; u_int cols = 0; @@ -1437,7 +1437,7 @@ void XTCreateTable::setTableName(XTThreadPtr self, char *name, bool alterTable) XTTableHPtr tab; /* Find the table... */ - pushsr_(tab, xt_heap_release, xt_use_table(self, (XTPathStrPtr) path, FALSE, TRUE, NULL)); + pushsr_(tab, xt_heap_release, xt_use_table(self, (XTPathStrPtr) path, FALSE, TRUE)); /* Clone the foreign key definitions: */ if (tab && tab->tab_dic.dic_table) { @@ -2027,24 +2027,52 @@ bool XTDDTableRef::modifyRow(XTOpenTablePtr XT_UNUSED(ref_ot), xtWord1 *before_b void XTDDTableRef::deleteAllRows(XTThreadPtr self) { XTOpenTablePtr ot; - xtInt8 row_count; + xtBool eof; + xtWord1 *buffer; if (!tr_fkey->getReferenceIndexPtr()) - throw_(); + xt_throw(self); if (!tr_fkey->getIndexPtr()) - throw_(); + xt_throw(self); if (!(ot = xt_db_open_table_using_tab(tr_fkey->co_table->dt_table, self))) - throw_(); + xt_throw(self); + /* {FREE-ROWS-BAD} */ + /* row_count = ((xtInt8) ot->ot_table->tab_row_eof_id) - 1; row_count -= (xtInt8) ot->ot_table->tab_row_fnum; + */ + /* Check if there are any rows in the referencing table: */ + if (!xt_tab_seq_init(ot)) + goto failed; + + if (!(buffer = (xtWord1 *) xt_malloc(self, ot->ot_table->tab_dic.dic_mysql_buf_size))) + goto failed_1; + + if (!xt_tab_seq_next(ot, buffer, &eof)) + goto failed_2; + + xt_free(self, buffer); + + xt_tab_seq_exit(ot); xt_db_return_table_to_pool_ns(ot); - if (row_count > 0) + if (!eof) xt_throw_ixterr(XT_CONTEXT, XT_ERR_ROW_IS_REFERENCED, tr_fkey->co_name); + return; + + failed_2: + xt_free(self, buffer); + + failed_1: + xt_tab_seq_exit(ot); + + failed: + xt_db_return_table_to_pool_ns(ot); + xt_throw(self); } void XTDDIndex::init(XTThreadPtr self, XTObject *obj) @@ -2117,7 +2145,7 @@ void XTDDForeignKey::loadString(XTThreadPtr self, XTStringBufferPtr sb) } xt_sb_concat(self, sb, "`)"); - if (fk_on_delete != XT_KEY_ACTION_DEFAULT && fk_on_delete != XT_KEY_ACTION_RESTRICT) { + if (fk_on_delete != XT_KEY_ACTION_RESTRICT) { xt_sb_concat(self, sb, " ON DELETE "); switch (fk_on_delete) { case XT_KEY_ACTION_CASCADE: xt_sb_concat(self, sb, "CASCADE"); break; @@ -2126,10 +2154,9 @@ void XTDDForeignKey::loadString(XTThreadPtr self, XTStringBufferPtr sb) case XT_KEY_ACTION_NO_ACTION: xt_sb_concat(self, sb, "NO ACTION"); break; } } - if (fk_on_update != XT_KEY_ACTION_DEFAULT && fk_on_update != XT_KEY_ACTION_RESTRICT) { + if (fk_on_update != XT_KEY_ACTION_RESTRICT) { xt_sb_concat(self, sb, " ON UPDATE "); switch (fk_on_update) { - case XT_KEY_ACTION_DEFAULT: xt_sb_concat(self, sb, "RESTRICT"); break; case XT_KEY_ACTION_RESTRICT: xt_sb_concat(self, sb, "RESTRICT"); break; case XT_KEY_ACTION_CASCADE: xt_sb_concat(self, sb, "CASCADE"); break; case XT_KEY_ACTION_SET_NULL: xt_sb_concat(self, sb, "SET NULL"); break; @@ -2259,8 +2286,8 @@ void XTDDForeignKey::removeReference(XTThreadPtr self) { XTDDTable *ref_tab; - xt_xlock_rwlock(self, &co_table->dt_ref_lock); - pushr_(xt_unlock_rwlock, &co_table->dt_ref_lock); + xt_recurrwlock_xlock(self, &co_table->dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &co_table->dt_ref_lock); if ((ref_tab = fk_ref_table)) { fk_ref_table = NULL; @@ -2270,7 +2297,7 @@ void XTDDForeignKey::removeReference(XTThreadPtr self) fk_ref_index = UINT_MAX; - freer_(); // xt_unlock_rwlock(&co_table->dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&co_table->dt_ref_lock); } /* @@ -2289,7 +2316,7 @@ bool XTDDForeignKey::insertRow(xtWord1 *before_buf, xtWord1 *rec_buf, XTThreadPt /* This lock ensures that the foreign key references are not * changed. */ - xt_slock_rwlock_ns(&co_table->dt_ref_lock); + xt_recurrwlock_slock_ns(&co_table->dt_ref_lock); if (!(loc_ind = getIndexPtr())) goto failed; @@ -2374,11 +2401,11 @@ bool XTDDForeignKey::insertRow(xtWord1 *before_buf, xtWord1 *rec_buf, XTThreadPt xt_db_return_table_to_pool_ns(ot); failed: - xt_unlock_rwlock_ns(&co_table->dt_ref_lock); + xt_recurrwlock_unslock_ns(&co_table->dt_ref_lock); return false; success: - xt_unlock_rwlock_ns(&co_table->dt_ref_lock); + xt_recurrwlock_unslock_ns(&co_table->dt_ref_lock); return true; } @@ -2389,7 +2416,6 @@ const char *XTDDForeignKey::actionTypeToString(int action) { switch (action) { - case XT_KEY_ACTION_DEFAULT: case XT_KEY_ACTION_RESTRICT: return "RESTRICT"; case XT_KEY_ACTION_CASCADE: @@ -2407,7 +2433,7 @@ const char *XTDDForeignKey::actionTypeToString(int action) void XTDDTable::init(XTThreadPtr self) { - xt_init_rwlock_with_autoname(self, &dt_ref_lock); + xt_recurrwlock_init_with_autoname(self, &dt_ref_lock); dt_trefs = NULL; } @@ -2444,7 +2470,7 @@ void XTDDTable::finalize(XTThreadPtr self) ptr->release(self); } - xt_free_rwlock(&dt_ref_lock); + xt_recurrwlock_free(&dt_ref_lock); } XTDDColumn *XTDDTable::findColumn(char *name) @@ -2520,8 +2546,8 @@ void XTDDTable::attachReference(XTThreadPtr self, XTDDForeignKey *fk) throw_(); } - xt_xlock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_xlock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &dt_ref_lock); if (!(tr = new XTDDTableRef())) xt_throw_errno(XT_CONTEXT, XT_ENOMEM); @@ -2536,7 +2562,7 @@ void XTDDTable::attachReference(XTThreadPtr self, XTDDForeignKey *fk) */ xt_heap_reference(self, fk->co_table->dt_table); - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&dt_ref_lock); } /* @@ -2546,8 +2572,8 @@ void XTDDTable::removeReference(XTThreadPtr self, XTDDForeignKey *fk) { XTDDTableRef *tr, *prev_tr = NULL; - xt_xlock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_xlock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &dt_ref_lock); tr = dt_trefs; while (tr) { @@ -2561,7 +2587,7 @@ void XTDDTable::removeReference(XTThreadPtr self, XTDDForeignKey *fk) prev_tr = tr; tr = tr->tr_next; } - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&dt_ref_lock); if (tr) tr->release(self); } @@ -2588,8 +2614,8 @@ void XTDDTable::attachReference(XTThreadPtr self, XTDDTable *dt) dt->attachReference(self, fk); - xt_xlock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_xlock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &dt_ref_lock); /* Referenced the table, not the index! * We do this because we know that if the table is referenced, the * index will remain valid! @@ -2599,7 +2625,7 @@ void XTDDTable::attachReference(XTThreadPtr self, XTDDTable *dt) */ xt_heap_reference(self, dt->dt_table); fk->fk_ref_table = dt; - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&dt_ref_lock); } } } @@ -2626,7 +2652,7 @@ void XTDDTable::attachReferences(XTThreadPtr self, XTDatabaseHPtr db) /* get pointer to the referenced table, load it if needed * cyclic references are being handled, absent table is ignored */ - tab = xt_use_table_no_lock(self, db, fk->fk_ref_tab_name, /*TRUE*/FALSE, /*FALSE*/TRUE, NULL, NULL); + tab = xt_use_table_no_lock(self, db, fk->fk_ref_tab_name, /*TRUE*/FALSE, /*FALSE*/TRUE, NULL); if (tab) { pushr_(xt_heap_release, tab); @@ -2663,8 +2689,8 @@ void XTDDTable::removeReferences(XTThreadPtr self) XTDDTableRef *tr; XTDDTable *tab; - xt_xlock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_xlock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &dt_ref_lock); for (u_int i=0; i<dt_fkeys.size(); i++) { fk = dt_fkeys.itemAt(i); @@ -2675,13 +2701,13 @@ void XTDDTable::removeReferences(XTThreadPtr self) /* To avoid deadlock we do not hold more than * one lock at a time! */ - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&dt_ref_lock); tab->removeReference(self, fk); xt_heap_release(self, tab->dt_table); /* We referenced the table, not the index! */ - xt_xlock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_xlock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &dt_ref_lock); } } } @@ -2689,13 +2715,13 @@ void XTDDTable::removeReferences(XTThreadPtr self) while (dt_trefs) { tr = dt_trefs; dt_trefs = tr->tr_next; - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&dt_ref_lock); tr->release(self); - xt_xlock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_xlock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unxlock, &dt_ref_lock); } - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unxlock(&dt_ref_lock); } void XTDDTable::checkForeignKeys(XTThreadPtr self, bool temp_table) @@ -2727,7 +2753,7 @@ void XTDDTable::checkForeignKeys(XTThreadPtr self, bool temp_table) // TODO: dont close table immediately so it can be possibly reused in this loop XTTable *ref_tab; - pushsr_(ref_tab, xt_heap_release, xt_use_table(self, fk->fk_ref_tab_name, FALSE, TRUE, NULL)); + pushsr_(ref_tab, xt_heap_release, xt_use_table(self, fk->fk_ref_tab_name, FALSE, TRUE)); if (ref_tab && !fk->checkReferencedTypes(ref_tab->tab_dic.dic_table)) throw_(); freer_(); @@ -2845,7 +2871,7 @@ bool XTDDTable::checkNoAction(XTOpenTablePtr ot, xtRecordID rec_id) return false; rec_ptr = rec_buf.ib_db.db_data; - xt_slock_rwlock_ns(&dt_ref_lock); + xt_recurrwlock_slock_ns(&dt_ref_lock); tr = dt_trefs; while (tr) { if (!tr->checkReference(rec_ptr, ot->ot_thread)) { @@ -2854,7 +2880,7 @@ bool XTDDTable::checkNoAction(XTOpenTablePtr ot, xtRecordID rec_id) } tr = tr->tr_next; } - xt_unlock_rwlock_ns(&dt_ref_lock); + xt_recurrwlock_unslock_ns(&dt_ref_lock); xt_ib_free(NULL, &rec_buf); return ok; } @@ -2875,7 +2901,7 @@ bool XTDDTable::deleteRow(XTOpenTablePtr ot, xtWord1 *rec_ptr) rec_ptr = rec_buf.ib_db.db_data; } - xt_slock_rwlock_ns(&dt_ref_lock); + xt_recurrwlock_slock_ns(&dt_ref_lock); tr = dt_trefs; while (tr) { if (!tr->modifyRow(ot, rec_ptr, NULL, ot->ot_thread)) { @@ -2884,7 +2910,7 @@ bool XTDDTable::deleteRow(XTOpenTablePtr ot, xtWord1 *rec_ptr) } tr = tr->tr_next; } - xt_unlock_rwlock_ns(&dt_ref_lock); + xt_recurrwlock_unslock_ns(&dt_ref_lock); xt_ib_free(NULL, &rec_buf); return ok; } @@ -2893,8 +2919,8 @@ void XTDDTable::deleteAllRows(XTThreadPtr self) { XTDDTableRef *tr; - xt_slock_rwlock(self, &dt_ref_lock); - pushr_(xt_unlock_rwlock, &dt_ref_lock); + xt_recurrwlock_slock(self, &dt_ref_lock); + pushr_(xt_recurrwlock_unslock, &dt_ref_lock); tr = dt_trefs; while (tr) { @@ -2902,7 +2928,7 @@ void XTDDTable::deleteAllRows(XTThreadPtr self) tr = tr->tr_next; } - freer_(); // xt_unlock_rwlock(&dt_ref_lock); + freer_(); // xt_recurrwlock_unslock(&dt_ref_lock); } bool XTDDTable::updateRow(XTOpenTablePtr ot, xtWord1 *before, xtWord1 *after) @@ -2932,7 +2958,7 @@ bool XTDDTable::updateRow(XTOpenTablePtr ot, xtWord1 *before, xtWord1 *after) ok = true; before_buf.ib_free = FALSE; - xt_slock_rwlock_ns(&dt_ref_lock); + xt_recurrwlock_slock_ns(&dt_ref_lock); if ((tr = dt_trefs)) { if (!before) { if (!xt_tab_load_record(ot, ot->ot_curr_rec_id, &before_buf)) @@ -2948,7 +2974,7 @@ bool XTDDTable::updateRow(XTOpenTablePtr ot, xtWord1 *before, xtWord1 *after) tr = tr->tr_next; } } - xt_unlock_rwlock_ns(&dt_ref_lock); + xt_recurrwlock_unslock_ns(&dt_ref_lock); xt_ib_free(NULL, &before_buf); return ok; diff --git a/storage/pbxt/src/datadic_xt.h b/storage/pbxt/src/datadic_xt.h index 1e56561614d..8dd6387f137 100644 --- a/storage/pbxt/src/datadic_xt.h +++ b/storage/pbxt/src/datadic_xt.h @@ -45,7 +45,6 @@ struct XTIndex; #define XT_DD_KEY_PRIMARY 2 #define XT_DD_KEY_FOREIGN 3 -#define XT_KEY_ACTION_DEFAULT 0 #define XT_KEY_ACTION_RESTRICT 1 #define XT_KEY_ACTION_CASCADE 2 #define XT_KEY_ACTION_SET_NULL 3 @@ -259,7 +258,7 @@ class XTDDTable : public XTObject { XTList<XTDDColumn> dt_cols; XTList<XTDDIndex> dt_indexes; - xt_rwlock_type dt_ref_lock; /* The lock for adding and using references. */ + XTRecurRWLockRec dt_ref_lock; /* The lock for adding and using references. */ XTList<XTDDForeignKey> dt_fkeys; /* The foreign keys on this table. */ XTDDTableRef *dt_trefs; /* A list of tables that reference this table. */ diff --git a/storage/pbxt/src/datalog_xt.cc b/storage/pbxt/src/datalog_xt.cc index 3530b516f62..ff58a122e10 100644 --- a/storage/pbxt/src/datalog_xt.cc +++ b/storage/pbxt/src/datalog_xt.cc @@ -1148,6 +1148,11 @@ void XTDataLogBuffer::dlb_exit(XTThreadPtr self) xtBool XTDataLogBuffer::dlb_close_log(XTThreadPtr thread) { if (dlb_data_log) { + if (dlb_data_log->dlf_log_file) { + if (!dl_write_log_header(dlb_data_log, dlb_data_log->dlf_log_file, 0, thread)) + return FAILED; + } + /* Flush and commit the data in the old log: */ if (!dlb_flush_log(TRUE, thread)) return FAILED; @@ -1244,7 +1249,7 @@ xtBool XTDataLogBuffer::dlb_write_thru_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtL */ dlb_data_log->dlf_log_eof += size; #ifdef DEBUG - if (log_offset + size > dlb_max_write_offset) + if ((ulonglong) (log_offset + size) > (ulonglong) dlb_max_write_offset) dlb_max_write_offset = log_offset + size; #endif dlb_flush_required = TRUE; @@ -1286,7 +1291,7 @@ xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOf if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread)) return FAILED; #ifdef DEBUG - if (log_offset + size > dlb_max_write_offset) + if ((ulonglong) (log_offset + size) > (ulonglong) dlb_max_write_offset) dlb_max_write_offset = log_offset + size; #endif dlb_flush_required = TRUE; @@ -1729,8 +1734,8 @@ static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogF xtLogOffset src_log_offset; xtLogID curr_log_id; xtLogOffset curr_log_offset; - xtLogID dest_log_id; - xtLogOffset dest_log_offset; + xtLogID dest_log_id= 0; + xtLogOffset dest_log_offset= 0; off_t garbage_count = 0; memset(&cs, 0, sizeof(XTCompactorStateRec)); @@ -1952,7 +1957,7 @@ static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogF log_rec.xl_status_1 = XT_LOG_ENT_DEL_LOG; log_rec.xl_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id); XT_SET_DISK_4(log_rec.xl_log_id_4, data_log->dlf_log_id); - if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, TRUE)) { + if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, XT_XLOG_WRITE_AND_FLUSH)) { db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_COMPACT); xt_throw(self); } diff --git a/storage/pbxt/src/filesys_xt.h b/storage/pbxt/src/filesys_xt.h index d6762823cc0..6d8dd280e5e 100644 --- a/storage/pbxt/src/filesys_xt.h +++ b/storage/pbxt/src/filesys_xt.h @@ -119,7 +119,7 @@ xtBool xt_fs_rename(struct XTThread *self, char *from_path, char *to_path); #define FILE_MAP_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, i) #define FILE_MAP_FREE_LOCK(s, i) xt_spinxslock_free(s, i) #define FILE_MAP_READ_LOCK(i, o) xt_spinxslock_slock(i, o) -#define FILE_MAP_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, o) +#define FILE_MAP_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, FALSE, o) #define FILE_MAP_UNLOCK(i, o) xt_spinxslock_unlock(i, o) #endif diff --git a/storage/pbxt/src/ha_pbxt.cc b/storage/pbxt/src/ha_pbxt.cc index 78de3c44e37..ba3aa756516 100644 --- a/storage/pbxt/src/ha_pbxt.cc +++ b/storage/pbxt/src/ha_pbxt.cc @@ -80,6 +80,7 @@ using drizzled::plugin::InfoSchemaMethods; #include "systab_xt.h" #include "xaction_xt.h" #include "backup_xt.h" +#include "heap_xt.h" #ifdef DEBUG //#define XT_USE_SYS_PAR_DEBUG_SIZES @@ -113,6 +114,7 @@ static int pbxt_prepare(handlerton *hton, THD *thd, bool all); static int pbxt_recover(handlerton *hton, XID *xid_list, uint len); static int pbxt_commit_by_xid(handlerton *hton, XID *xid); static int pbxt_rollback_by_xid(handlerton *hton, XID *xid); +static int pbxt_start_consistent_snapshot(handlerton *hton, THD *thd); #endif static void ha_aquire_exclusive_use(XTThreadPtr self, XTSharePtr share, ha_pbxt *mine); static void ha_release_exclusive_use(XTThreadPtr self, XTSharePtr share); @@ -286,7 +288,7 @@ static void ha_trace_function(const char *function, char *table) char func_buf[50], *ptr; XTThreadPtr thread = xt_get_self(); - if ((ptr = strchr(function, '('))) { + if ((ptr = const_cast<char *>(strchr(function, '(')))) { ptr--; while (ptr > function) { if (!(isalnum(*ptr) || *ptr == '_')) @@ -345,13 +347,13 @@ static xtHashValue ha_hash_ci(xtBool is_key, void *key_data) return xt_ht_casehash(share->sh_table_path->ps_path); } -static void ha_open_share(XTThreadPtr self, XTShareRec *share, xtBool *tabled_opened) +static void ha_open_share(XTThreadPtr self, XTShareRec *share) { xt_lock_mutex(self, (xt_mutex_type *) share->sh_ex_mutex); pushr_(xt_unlock_mutex, share->sh_ex_mutex); if (!share->sh_table) { - share->sh_table = xt_use_table(self, share->sh_table_path, FALSE, FALSE, tabled_opened); + share->sh_table = xt_use_table(self, share->sh_table_path, FALSE, FALSE); share->sh_dic_key_count = share->sh_table->tab_dic.dic_key_count; share->sh_dic_keys = share->sh_table->tab_dic.dic_keys; share->sh_recalc_selectivity = FALSE; @@ -411,7 +413,7 @@ static void ha_hash_free(XTThreadPtr self, void *data) * This structure contains information that is common to all handles. * (i.e. it is table specific). */ -static XTSharePtr ha_get_share(XTThreadPtr self, const char *table_path, bool open_table, xtBool *tabled_opened) +static XTSharePtr ha_get_share(XTThreadPtr self, const char *table_path, bool open_table) { XTShareRec *share; @@ -433,7 +435,7 @@ static XTSharePtr ha_get_share(XTThreadPtr self, const char *table_path, bool op share->sh_table_path = (XTPathStrPtr) xt_dup_string(self, table_path); if (open_table) - ha_open_share(self, share, tabled_opened); + ha_open_share(self, share); popr_(); // Discard ha_cleanup_share(share); @@ -476,6 +478,25 @@ static xtBool ha_unget_share_removed(XTThreadPtr self, XTSharePtr share) return removed; } +static inline void thd_init_xact(THD *thd, XTThreadPtr self, bool set_table_trans) +{ + self->st_xact_mode = thd_tx_isolation(thd) <= ISO_READ_COMMITTED ? XT_XACT_COMMITTED_READ : XT_XACT_REPEATABLE_READ; + self->st_ignore_fkeys = (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS)) != 0; + self->st_auto_commit = (thd_test_options(thd,(OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) == 0; + if (set_table_trans) { +#ifdef DRIZZLED + self->st_table_trans = FALSE; +#else + self->st_table_trans = thd_sql_command(thd) == SQLCOM_LOCK_TABLES; +#endif + } + self->st_abort_trans = FALSE; + self->st_stat_ended = FALSE; + self->st_stat_trans = FALSE; + XT_PRINT0(self, "xt_xn_begin\n"); + xt_xres_wait_for_recovery(self, XT_RECOVER_SWEPT); +} + /* * ----------------------------------------------------------------------- * PUBLIC FUNCTIONS @@ -1146,6 +1167,7 @@ static int pbxt_init(void *p) pbxt_hton->show_status = pbxt_show_status; pbxt_hton->flags = HTON_NO_FLAGS; /* HTON_CAN_RECREATE - Without this flags TRUNCATE uses delete_all_rows() */ pbxt_hton->slot = (uint)-1; /* assign invald value, so we know when it's inited later */ + pbxt_hton->start_consistent_snapshot = pbxt_start_consistent_snapshot; #if defined(MYSQL_SUPPORTS_BACKUP) && defined(XT_ENABLE_ONLINE_BACKUP) pbxt_hton->get_backup_engine = pbxt_backup_engine; #endif @@ -1175,13 +1197,8 @@ static int pbxt_init(void *p) * +1 Temporary thread (e.g. TempForClose, TempForEnd) */ #ifndef DRIZZLED - if (pbxt_max_threads == 0) { - // Embedded server sets max_connections=1 - if (max_connections > 1) - pbxt_max_threads = max_connections + 7; - else - pbxt_max_threads = 100; - } + if (pbxt_max_threads == 0) + pbxt_max_threads = max_connections + 7; #endif self = xt_init_threading(pbxt_max_threads); /* Create the main self: */ if (!self) @@ -1284,7 +1301,7 @@ static int pbxt_init(void *p) #6 0x000debe1 in THD::THD at sql_class.cc:631 #7 0x00e207a4 in myxt_create_thread at myxt_xt.cc:2666 #8 0x00e3134b in tabc_fr_run_thread at tabcache_xt.cc:982 - #9 0x00e422ca in thr_main_pbxt at thread_xt.cc:1006 + #9 0x00e422ca in xt_thread_main at thread_xt.cc:1006 #10 0x91ff7c55 in _pthread_start #11 0x91ff7b12 in thread_start * @@ -1417,8 +1434,43 @@ static void pbxt_drop_database(handlerton *XT_UNUSED(hton), char *XT_UNUSED(path * * 3. If in BEGIN/END we must call ha_rollback() if we abort the transaction * internally. + * + * NOTE ON CONSISTENT SNAPSHOTS: + * + * PBXT itself doesn't need this functiona as its transaction mechanism provides + * consistent snapshots for all transactions by default. This function is needed + * only for multi-engine cases like this: + * + * CREATE TABLE t1 ... ENGINE=INNODB + * CREATE TABLE t2 ... ENGINE=PBXT + * START TRANSACTION WITH CONSISTENT SNAPSHOT + * SELECT * FROM t1 <-- at this point we need to know about the snapshot */ +static int pbxt_start_consistent_snapshot(handlerton *hton, THD *thd) +{ + int err = 0; + XTThreadPtr self = ha_set_current_thread(thd, &err); + + if (!self->st_database && pbxt_database) { + xt_ha_open_database_of_table(self, (XTPathStrPtr) NULL); + } + + thd_init_xact(thd, self, true); + + if (xt_xn_begin(self)) { + trans_register_ha(thd, TRUE, hton); + } else { + err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE); + } + + /* + * As of MySQL 5.1.41 the return value is not checked, so the server might assume + * everything is fine even it isn't. InnoDB returns 0 on success. + */ + return err; +} + /* * Commit the PBXT transaction of the given thread. * thd is the MySQL thread structure. @@ -1860,7 +1912,6 @@ xtPublic int ha_pbxt::reopen() THD *thd = current_thd; int err = 0; XTThreadPtr self; - xtBool tabled_opened = FALSE; if (!(self = ha_set_current_thread(thd, &err))) return xt_ha_pbxt_to_mysql_error(err); @@ -1868,24 +1919,30 @@ xtPublic int ha_pbxt::reopen() try_(a) { xt_ha_open_database_of_table(self, pb_share->sh_table_path); - ha_open_share(self, pb_share, &tabled_opened); + ha_open_share(self, pb_share); if (!(pb_open_tab = xt_db_open_table_using_tab(pb_share->sh_table, self))) xt_throw(self); pb_open_tab->ot_thread = self; - if (tabled_opened) { + /* {TABLE-STATS} + * We no longer use the information that a table + * was opened in order to know when to calculate + * statistics. + */ + if (!pb_open_tab->ot_table->tab_ind_stat_calc_time) { #ifdef LOAD_TABLE_ON_OPEN xt_tab_load_table(self, pb_open_tab); #else xt_tab_load_row_pointers(self, pb_open_tab); #endif - xt_ind_set_index_selectivity(self, pb_open_tab); + xt_ind_set_index_selectivity(pb_open_tab, self); /* If the number of rows is less than 150 we will recalculate the * selectity of the indices, as soon as the number of rows * exceeds 200 (see [**]) */ - pb_share->sh_recalc_selectivity = (pb_share->sh_table->tab_row_eof_id - 1 - pb_share->sh_table->tab_row_fnum) < 150; + /* {FREE-ROWS-BAD} */ + pb_share->sh_recalc_selectivity = (pb_share->sh_table->tab_row_eof_id - 1 /* - pb_share->sh_table->tab_row_fnum */) < 150; } /* I am not doing this anymore because it was only required @@ -2259,7 +2316,6 @@ int ha_pbxt::open(const char *table_path, int XT_UNUSED(mode), uint XT_UNUSED(te THD *thd = current_thd; int err = 0; XTThreadPtr self; - xtBool tabled_opened = FALSE; ref_length = XT_RECORD_OFFS_SIZE; @@ -2272,28 +2328,30 @@ int ha_pbxt::open(const char *table_path, int XT_UNUSED(mode), uint XT_UNUSED(te try_(a) { xt_ha_open_database_of_table(self, (XTPathStrPtr) table_path); - pb_share = ha_get_share(self, table_path, true, &tabled_opened); + pb_share = ha_get_share(self, table_path, false); ha_add_to_handler_list(self, pb_share, this); if (pb_share->sh_table_lock) { if (!ha_wait_for_shared_use(this, pb_share)) xt_throw(self); } - ha_open_share(self, pb_share, &tabled_opened); + ha_open_share(self, pb_share); thr_lock_data_init(&pb_share->sh_lock, &pb_lock, NULL); if (!(pb_open_tab = xt_db_open_table_using_tab(pb_share->sh_table, self))) xt_throw(self); pb_open_tab->ot_thread = self; - if (tabled_opened) { + /* {TABLE-STATS} */ + if (!pb_open_tab->ot_table->tab_ind_stat_calc_time) { #ifdef LOAD_TABLE_ON_OPEN xt_tab_load_table(self, pb_open_tab); #else xt_tab_load_row_pointers(self, pb_open_tab); #endif - xt_ind_set_index_selectivity(self, pb_open_tab); - pb_share->sh_recalc_selectivity = (pb_share->sh_table->tab_row_eof_id - 1 - pb_share->sh_table->tab_row_fnum) < 150; + xt_ind_set_index_selectivity(pb_open_tab, self); + /* {FREE-ROWS-BAD} */ + pb_share->sh_recalc_selectivity = (pb_share->sh_table->tab_row_eof_id - 1 /* - pb_share->sh_table->tab_row_fnum */) < 150; } init_auto_increment(0); @@ -2401,7 +2459,7 @@ void ha_pbxt::init_auto_increment(xtWord8 min_auto_inc) self->st_abort_trans = FALSE; self->st_stat_ended = FALSE; self->st_stat_trans = FALSE; - self->st_is_update = FALSE; + self->st_is_update = NULL; if (!xt_xn_begin(self)) { xt_spinlock_unlock(&tab->tab_ainc_lock); xt_throw(self); @@ -2641,8 +2699,14 @@ int ha_pbxt::write_row(byte *buf) * and if it gets dup-key error it tries UPDATE, so the same row can be overwriten multiple * times within the same statement */ - if (err == HA_ERR_FOUND_DUPP_KEY && pb_open_tab->ot_thread->st_is_update) - pb_open_tab->ot_thread->st_update_id++; + if (err == HA_ERR_FOUND_DUPP_KEY && pb_open_tab->ot_thread->st_is_update) { + /* Pop the update stack: */ + //pb_open_tab->ot_thread->st_update_id++; + XTOpenTablePtr curr = pb_open_tab->ot_thread->st_is_update; + + pb_open_tab->ot_thread->st_is_update = curr->ot_prev_update; + curr->ot_prev_update = NULL; + } } done: @@ -2710,9 +2774,12 @@ int ha_pbxt::update_row(const byte * old_data, byte * new_data) xt_xlog_check_long_writer(self); - if (!self->st_is_update) { - self->st_is_update = TRUE; - self->st_update_id++; + /* {UPDATE-STACK} */ + if (self->st_is_update != pb_open_tab) { + /* Push the update stack: */ + pb_open_tab->ot_prev_update = self->st_is_update; + self->st_is_update = pb_open_tab; + pb_open_tab->ot_update_id++; } if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) @@ -3110,6 +3177,7 @@ int ha_pbxt::index_init(uint idx, bool XT_UNUSED(sorted)) active_index = idx; if (pb_open_tab->ot_table->tab_dic.dic_disable_index) { + active_index = MAX_KEY; xt_tab_set_index_error(pb_open_tab->ot_table); return ha_log_pbxt_thread_error_for_mysql(pb_ignore_dup_key); } @@ -3260,6 +3328,10 @@ int ha_pbxt::index_read_xt(byte * buf, uint idx, const byte *key, uint key_len, int prefix = 0; XTIdxSearchKeyRec search_key; + if (idx == MAX_KEY) { + err = HA_ERR_WRONG_INDEX; + goto done; + } #ifdef XT_TRACK_RETURNED_ROWS ha_start_scan(pb_open_tab, idx); #endif @@ -3310,6 +3382,7 @@ int ha_pbxt::index_read_xt(byte * buf, uint idx, const byte *key, uint key_len, ha_return_row(pb_open_tab, idx); #endif XT_DISABLED_TRACE(("search tx=%d val=%d err=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(key), err)); + done: if (err) table->status = STATUS_NOT_FOUND; else { @@ -3354,6 +3427,10 @@ int ha_pbxt::index_next(byte * buf) //statistic_increment(ha_read_next_count,&LOCK_status); ASSERT_NS(pb_ex_in_use); + if (active_index == MAX_KEY) { + err = HA_ERR_WRONG_INDEX; + goto done; + } ind = (XTIndexPtr) pb_share->sh_dic_keys[active_index]; if (!xt_idx_next(pb_open_tab, ind, NULL)) @@ -3366,6 +3443,7 @@ int ha_pbxt::index_next(byte * buf) if (!err) ha_return_row(pb_open_tab, active_index); #endif + done: if (err) table->status = STATUS_NOT_FOUND; else { @@ -3396,6 +3474,10 @@ int ha_pbxt::index_next_same(byte * buf, const byte *key, uint length) //statistic_increment(ha_read_next_count,&LOCK_status); ASSERT_NS(pb_ex_in_use); + if (active_index == MAX_KEY) { + err = HA_ERR_WRONG_INDEX; + goto done; + } ind = (XTIndexPtr) pb_share->sh_dic_keys[active_index]; search_key.sk_key_value.sv_flags = HA_READ_KEY_EXACT; @@ -3415,6 +3497,7 @@ int ha_pbxt::index_next_same(byte * buf, const byte *key, uint length) if (!err) ha_return_row(pb_open_tab, active_index); #endif + done: if (err) table->status = STATUS_NOT_FOUND; else { @@ -3436,6 +3519,10 @@ int ha_pbxt::index_prev(byte * buf) //statistic_increment(ha_read_prev_count,&LOCK_status); ASSERT_NS(pb_ex_in_use); + if (active_index == MAX_KEY) { + err = HA_ERR_WRONG_INDEX; + goto done; + } ind = (XTIndexPtr) pb_share->sh_dic_keys[active_index]; if (!xt_idx_prev(pb_open_tab, ind, NULL)) @@ -3448,6 +3535,7 @@ int ha_pbxt::index_prev(byte * buf) if (!err) ha_return_row(pb_open_tab, active_index); #endif + done: if (err) table->status = STATUS_NOT_FOUND; else { @@ -3470,6 +3558,18 @@ int ha_pbxt::index_first(byte * buf) //statistic_increment(ha_read_first_count,&LOCK_status); ASSERT_NS(pb_ex_in_use); + /* This is required because MySQL ignores the error returned + * init init_index sometimes, for example: + * + * if (!table->file->inited) + * table->file->ha_index_init(tab->index, tab->sorted); + * if ((error=tab->table->file->index_first(tab->table->record[0]))) + */ + if (active_index == MAX_KEY) { + err = HA_ERR_WRONG_INDEX; + goto done; + } + #ifdef XT_TRACK_RETURNED_ROWS ha_start_scan(pb_open_tab, active_index); #endif @@ -3488,6 +3588,7 @@ int ha_pbxt::index_first(byte * buf) if (!err) ha_return_row(pb_open_tab, active_index); #endif + done: if (err) table->status = STATUS_NOT_FOUND; else { @@ -3510,6 +3611,11 @@ int ha_pbxt::index_last(byte * buf) //statistic_increment(ha_read_last_count,&LOCK_status); ASSERT_NS(pb_ex_in_use); + if (active_index == MAX_KEY) { + err = HA_ERR_WRONG_INDEX; + goto done; + } + #ifdef XT_TRACK_RETURNED_ROWS ha_start_scan(pb_open_tab, active_index); #endif @@ -3528,6 +3634,7 @@ int ha_pbxt::index_last(byte * buf) if (!err) ha_return_row(pb_open_tab, active_index); #endif + done: if (err) table->status = STATUS_NOT_FOUND; else { @@ -3815,8 +3922,34 @@ int ha_pbxt::info(uint flag) if ((ot = pb_open_tab)) { if (flag & HA_STATUS_VARIABLE) { - stats.deleted = ot->ot_table->tab_row_fnum; - stats.records = (ha_rows) (ot->ot_table->tab_row_eof_id - 1 - stats.deleted); + /* {FREE-ROWS-BAD} + * Free row count is not reliable, so ignore it. + * The problem is if tab_row_fnum > tab_row_eof_id - 1 then + * we have a very bad result. + * + * If stats.records+EXTRA_RECORDS == 0 as returned by + * estimate_rows_upper_bound(), then filesort will crash here: + * + * make_sortkey(param,sort_keys[idx++],ref_pos); + * + * #0 0x000bf69c in Field_long::sort_string at field.cc:3766 + * #1 0x0022e1f1 in make_sortkey at filesort.cc:769 + * #2 0x0022f1cf in find_all_keys at filesort.cc:619 + * #3 0x00230eec in filesort at filesort.cc:243 + * #4 0x001b9d89 in mysql_update at sql_update.cc:415 + * #5 0x0010db12 in mysql_execute_command at sql_parse.cc:2959 + * #6 0x0011480d in mysql_parse at sql_parse.cc:5787 + * #7 0x00115afb in dispatch_command at sql_parse.cc:1200 + * #8 0x00116de2 in do_command at sql_parse.cc:857 + * #9 0x00101ee4 in handle_one_connection at sql_connect.cc:1115 + * + * The problem is that sort_keys is allocated to handle just 1 vector. + * Sorting one vector crashes. Although I could not find a check for + * the actual number of vectors. But it must assume that it has at + * least EXTRA_RECORDS vectors. + */ + stats.deleted = /* ot->ot_table->tab_row_fnum */ 0; + stats.records = (ha_rows) (ot->ot_table->tab_row_eof_id - 1 /* - stats.deleted */); stats.data_file_length = xt_rec_id_to_rec_offset(ot->ot_table, ot->ot_table->tab_rec_eof_id); stats.index_file_length = xt_ind_node_to_offset(ot->ot_table, ot->ot_table->tab_ind_eof); stats.delete_length = ot->ot_table->tab_rec_fnum * ot->ot_rec_size; @@ -4444,11 +4577,13 @@ xtPublic int ha_pbxt::external_lock(THD *thd, int lock_type) } if (pb_share->sh_recalc_selectivity) { - if ((pb_share->sh_table->tab_row_eof_id - 1 - pb_share->sh_table->tab_row_fnum) >= 200) { + /* {FREE-ROWS-BAD} */ + if ((pb_share->sh_table->tab_row_eof_id - 1 /* - pb_share->sh_table->tab_row_fnum */) >= 200) { /* [**] */ pb_share->sh_recalc_selectivity = FALSE; - xt_ind_set_index_selectivity(self, pb_open_tab); - pb_share->sh_recalc_selectivity = (pb_share->sh_table->tab_row_eof_id - 1 - pb_share->sh_table->tab_row_fnum) < 150; + xt_ind_set_index_selectivity(pb_open_tab, self); + /* {FREE-ROWS-BAD} */ + pb_share->sh_recalc_selectivity = (pb_share->sh_table->tab_row_eof_id - 1 /* - pb_share->sh_table->tab_row_fnum */) < 150; } } } @@ -4487,7 +4622,7 @@ xtPublic int ha_pbxt::external_lock(THD *thd, int lock_type) if (!pb_share->sh_table) { xt_ha_open_database_of_table(self, pb_share->sh_table_path); - ha_open_share(self, pb_share, NULL); + ha_open_share(self, pb_share); } } catch_(a) { @@ -4593,8 +4728,8 @@ xtPublic int ha_pbxt::external_lock(THD *thd, int lock_type) cont_(b); } - /* See {IS-UPDATE-STAT} */ - self->st_is_update = FALSE; + /* See {IS-UPDATE-STAT} nad {UPDATE-STACK} */ + self->st_is_update = NULL; /* Auto begin a transaction (if one is not already running): */ if (!self->st_xact_data) { @@ -4602,19 +4737,8 @@ xtPublic int ha_pbxt::external_lock(THD *thd, int lock_type) (void) ASSERT_NS(ISO_READ_UNCOMMITTED == XT_XACT_UNCOMMITTED_READ); (void) ASSERT_NS(ISO_SERIALIZABLE == XT_XACT_SERIALIZABLE); - self->st_xact_mode = thd_tx_isolation(thd) <= ISO_READ_COMMITTED ? XT_XACT_COMMITTED_READ : XT_XACT_REPEATABLE_READ; - self->st_ignore_fkeys = (thd_test_options(thd,OPTION_NO_FOREIGN_KEY_CHECKS)) != 0; - self->st_auto_commit = (thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) == 0; -#ifdef DRIZZLED - self->st_table_trans = FALSE; -#else - self->st_table_trans = thd_sql_command(thd) == SQLCOM_LOCK_TABLES; -#endif - self->st_abort_trans = FALSE; - self->st_stat_ended = FALSE; - self->st_stat_trans = FALSE; - XT_PRINT0(self, "xt_xn_begin\n"); - xt_xres_wait_for_recovery(self, XT_RECOVER_SWEPT); + thd_init_xact(thd, self, true); + if (!xt_xn_begin(self)) { err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key); pb_ex_in_use = 0; @@ -4837,19 +4961,27 @@ int ha_pbxt::start_stmt(THD *thd, thr_lock_type lock_type) * are nested within an open close of the select t1 * statement. */ - self->st_is_update = FALSE; + /* {UPDATE-STACK} + * Add to this I add the following: + * A trigger in the middle of an update also causes nested + * statements. If I reset st_is_update, then then + * when the trigger returns the system thinks we + * are in a different update statement, and may + * update the same row again. + */ + if (self->st_is_update == pb_open_tab) { + /* Pop the update stack: */ + XTOpenTablePtr curr = pb_open_tab->ot_thread->st_is_update; + + pb_open_tab->ot_thread->st_is_update = curr->ot_prev_update; + curr->ot_prev_update = NULL; + } /* See comment {START-TRANS} */ if (!self->st_xact_data) { - self->st_xact_mode = thd_tx_isolation(thd) <= ISO_READ_COMMITTED ? XT_XACT_COMMITTED_READ : XT_XACT_REPEATABLE_READ; - self->st_ignore_fkeys = (thd_test_options(thd, OPTION_NO_FOREIGN_KEY_CHECKS)) != 0; - self->st_auto_commit = (thd_test_options(thd,(OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) == 0; - /* self->st_table_trans = not set here! */ - self->st_abort_trans = FALSE; - self->st_stat_ended = FALSE; - self->st_stat_trans = FALSE; - XT_PRINT0(self, "xt_xn_begin\n"); - xt_xres_wait_for_recovery(self, XT_RECOVER_SWEPT); + + thd_init_xact(thd, self, false); + if (!xt_xn_begin(self)) { err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key); goto complete; @@ -4905,6 +5037,18 @@ int ha_pbxt::start_stmt(THD *thd, thr_lock_type lock_type) */ THR_LOCK_DATA **ha_pbxt::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type) { + /* + * TL_READ means concurrent INSERTs are allowed. This is a problem as in this mode + * PBXT is not compatible with MyISAM which allows INSERTs but isolates them from + * current "transaction" (started by LOCK TABLES, ended by UNLOCK TABLES). PBXT + * used to allow INSERTs and made them visible to the locker (on commit). + * While MySQL manual doesn't state anything regarding row visibility limitations + * we choose to convert local locks into normal read locks for better compatibility + * with MyISAM. + */ + if (lock_type == TL_READ) + lock_type = TL_READ_NO_INSERT; + if (lock_type != TL_IGNORE && pb_lock.type == TL_UNLOCK) { /* Set to TRUE for operations that require a table lock: */ switch (thd_sql_command(thd)) { @@ -5102,7 +5246,7 @@ int ha_pbxt::delete_table(const char *table_path) * We also cannot use pb_share because the handler used * to delete a table is not openned correctly. */ - share = ha_get_share(self, table_path, false, NULL); + share = ha_get_share(self, table_path, false); pushr_(ha_unget_share, share); ha_aquire_exclusive_use(self, share, NULL); pushr_(ha_release_exclusive_use, share); @@ -5261,7 +5405,7 @@ int ha_pbxt::rename_table(const char *from, const char *to) * called without correctly initializing * the handler! */ - share = ha_get_share(self, from, true, NULL); + share = ha_get_share(self, from, true); pushr_(ha_unget_share, share); ha_aquire_exclusive_use(self, share, NULL); pushr_(ha_release_exclusive_use, share); @@ -5842,6 +5986,11 @@ static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa, NULL, NULL, FALSE); #endif +static MYSQL_SYSVAR_INT(flush_log_at_trx_commit, xt_db_flush_log_at_trx_commit, + PLUGIN_VAR_OPCMDARG, + "Determines whether the transaction log is written and/or flushed when a transaction is committed (no matter what the setting the log is written and flushed once per second), 0 = no write & no flush, 1 = write & flush (default), 2 = write & no flush.", + NULL, NULL, 1, 0, 2, 1); + static struct st_mysql_sys_var* pbxt_system_variables[] = { MYSQL_SYSVAR(index_cache_size), MYSQL_SYSVAR(record_cache_size), @@ -5860,6 +6009,7 @@ static struct st_mysql_sys_var* pbxt_system_variables[] = { MYSQL_SYSVAR(sweeper_priority), MYSQL_SYSVAR(max_threads), MYSQL_SYSVAR(support_xa), + MYSQL_SYSVAR(flush_log_at_trx_commit), NULL }; #endif diff --git a/storage/pbxt/src/index_xt.cc b/storage/pbxt/src/index_xt.cc index 81437caf723..c8995fe253c 100644 --- a/storage/pbxt/src/index_xt.cc +++ b/storage/pbxt/src/index_xt.cc @@ -1328,10 +1328,12 @@ static void idx_insert_node_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSE XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_NODE_SIZE(result->sr_item.i_total_size)); } -static void idx_get_middle_branch_item(XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result) +static xtBool idx_get_middle_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result) { xtWord1 *bitem; + ASSERT_NS(result->sr_item.i_node_ref_size == 0 || result->sr_item.i_node_ref_size == XT_NODE_REF_SIZE); + ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE*2); if (ind->mi_fix_key) { u_int full_item_size = result->sr_item.i_item_size + result->sr_item.i_node_ref_size; @@ -1346,18 +1348,25 @@ static void idx_get_middle_branch_item(XTIndexPtr ind, XTIdxBranchDPtr branch, X } else { u_int node_ref_size; - u_int ilen; + u_int ilen, tlen; xtWord1 *bend; node_ref_size = result->sr_item.i_node_ref_size; - bitem = branch->tb_data + node_ref_size;; + bitem = branch->tb_data + node_ref_size; bend = &branch->tb_data[(result->sr_item.i_total_size - node_ref_size) / 2 + node_ref_size]; ilen = 0; if (bitem < bend) { + tlen = 0; for (;;) { ilen = myxt_get_key_length(ind, bitem); - if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend) + tlen += ilen + XT_RECORD_REF_SIZE + node_ref_size; + if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend) { + if (ilen > XT_INDEX_PAGE_SIZE || tlen > result->sr_item.i_total_size) { + xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name); + return FAILED; + } break; + } bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size; } } @@ -1370,6 +1379,7 @@ static void idx_get_middle_branch_item(XTIndexPtr ind, XTIdxBranchDPtr branch, X xt_get_record_ref(bitem + ilen, &value->sv_rec_id, &value->sv_row_id); memcpy(value->sv_key, bitem, value->sv_length); } + return OK; } static size_t idx_write_branch_item(XTIndexPtr XT_UNUSED(ind), xtWord1 *item, XTIdxKeyValuePtr value) @@ -1438,7 +1448,8 @@ static xtBool idx_replace_node_key(XTOpenTablePtr ot, XTIndexPtr ind, IdxStackIt /* We assume that value can be overwritten (which is the case) */ key_value.sv_flags = XT_SEARCH_WHOLE_KEY; key_value.sv_key = key_buf; - idx_get_middle_branch_item(ind, iref.ir_branch, &key_value, &result); + if (!idx_get_middle_branch_item(ot, ind, iref.ir_branch, &key_value, &result)) + goto failed_1; if (!idx_new_branch(ot, ind, &new_branch)) goto failed_1; @@ -1567,7 +1578,8 @@ static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackP ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE); /* We assume that value can be overwritten (which is the case) */ - idx_get_middle_branch_item(ind, &ot->ot_ind_wbuf, key_value, &result); + if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, key_value, &result)) + goto failed_1; if (!idx_new_branch(ot, ind, &new_branch)) goto failed_1; @@ -2041,7 +2053,7 @@ xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id, memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size); idx_insert_leaf_item(ind, &ot->ot_ind_wbuf, &key_value, &result); IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2)); - ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE); + ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE && result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE*2); /* This is the number of potential writes. In other words, the total number * of blocks that may be accessed. @@ -2053,7 +2065,8 @@ xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id, goto failed_1; /* Key does not fit, must split... */ - idx_get_middle_branch_item(ind, &ot->ot_ind_wbuf, &key_value, &result); + if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, &key_value, &result)) + goto failed_1; if (!idx_new_branch(ot, ind, &new_branch)) goto failed_1; @@ -3317,7 +3330,7 @@ xtPublic xtBool xt_idx_match_search(register XTOpenTablePtr XT_UNUSED(ot), regis return FALSE; } -static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTIndexPtr ind) +static void idx_set_index_selectivity(XTOpenTablePtr ot, XTIndexPtr ind, XTThreadPtr thread) { static const xtRecordID MAX_RECORDS = 100; @@ -3368,7 +3381,7 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd last_rec = ot->ot_curr_rec_id; key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE; - xt_ind_unlock_handle(ot->ot_ind_rhandle); + xt_ind_lock_handle(ot->ot_ind_rhandle); memcpy(key_buf, ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset, key_len); xt_ind_unlock_handle(ot->ot_ind_rhandle); } @@ -3410,7 +3423,7 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd last_iter_rec = last_rec; if (ot->ot_ind_rhandle) { - xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, self); + xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread); ot->ot_ind_rhandle = NULL; } } @@ -3431,8 +3444,10 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd return; failed_1: - xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, self); - ot->ot_ind_rhandle = NULL; + if (ot->ot_ind_rhandle) { + xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread); + ot->ot_ind_rhandle = NULL; + } failed: xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED); @@ -3440,16 +3455,23 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd return; } -xtPublic void xt_ind_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot) +xtPublic void xt_ind_set_index_selectivity(XTOpenTablePtr ot, XTThreadPtr thread) { XTTableHPtr tab = ot->ot_table; XTIndexPtr *ind; u_int i; - - if (!tab->tab_dic.dic_disable_index) { - for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) - idx_set_index_selectivity(self, ot, *ind); + time_t now; + + now = time(NULL); + xt_lock_mutex_ns(&tab->tab_ind_stat_lock); + if (tab->tab_ind_stat_calc_time < now) { + if (!tab->tab_dic.dic_disable_index) { + for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++) + idx_set_index_selectivity(ot, *ind, thread); + } + tab->tab_ind_stat_calc_time = time(NULL); } + xt_unlock_mutex_ns(&tab->tab_ind_stat_lock); } /* @@ -3740,7 +3762,8 @@ xtPublic void xt_ind_count_deleted_items(XTTableHPtr tab, XTIndexPtr ind, XTIndB static xtBool idx_flush_dirty_list(XTIndexLogPtr il, XTOpenTablePtr ot, u_int *flush_count, XTIndBlockPtr *flush_list) { for (u_int i=0; i<*flush_count; i++) - il->il_write_block(ot, flush_list[i]); + if (!il->il_write_block(ot, flush_list[i])) + return FAILED; *flush_count = 0; return OK; } @@ -3793,7 +3816,7 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool xtIndexNodeID ind_free; xtBool something_to_free = FALSE; xtIndexNodeID last_address, next_address; - xtWord2 curr_flush_seq; + xtWord4 curr_flush_seq; XTIndFreeListPtr list_ptr; u_int dirty_blocks; XTCheckPointTablePtr cp_tab; @@ -3810,8 +3833,9 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool if (!tab->tab_db->db_indlogs.ilp_get_log(&il, ot->ot_thread)) goto failed_3; - il->il_reset(tab->tab_id); - if (!il->il_write_byte(ot, XT_DT_FREE_LIST)) + if (!il->il_reset(ot)) + goto failed_2; + if (!il->il_write_byte(ot, XT_DT_LOG_HEAD)) goto failed_2; if (!il->il_write_word4(ot, tab->tab_id)) goto failed_2; @@ -3849,7 +3873,7 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool wrote_something = TRUE; while (block) { ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY); - ASSERT_NS(block->cp_flush_seq == curr_flush_seq); + ASSERT_NS((block->cp_flush_seq == curr_flush_seq) || xt_xn_is_before(block->cp_flush_seq, curr_flush_seq)); if (!ind_add_to_dirty_list(il, ot, &flush_count, flush_list, block)) goto failed; block = block->cb_dirty_next; @@ -4023,7 +4047,7 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool fblock = block; block = block->cb_dirty_next; ASSERT_NS(fblock->cb_state == IDX_CAC_BLOCK_DIRTY); - if (fblock->cp_flush_seq == curr_flush_seq) { + if (fblock->cp_flush_seq == curr_flush_seq || xt_xn_is_before(fblock->cp_flush_seq, curr_flush_seq)) { /* Take the block off the dirty list: */ if (fblock->cb_dirty_next) fblock->cb_dirty_next->cb_dirty_prev = fblock->cb_dirty_prev; @@ -4254,12 +4278,32 @@ void XTIndexLogPool::ilp_release_log(XTIndexLogPtr il) xt_unlock_mutex_ns(&ilp_lock); } -void XTIndexLog::il_reset(xtTableID tab_id) +xtBool XTIndexLog::il_reset(XTOpenTable *ot) { + XTIndLogHeadDRec log_head; + xtTableID tab_id = ot->ot_table->tab_id; + il_tab_id = tab_id; il_log_eof = 0; il_buffer_len = 0; il_buffer_offset = 0; + + /* We must write the header and flush here or the "previous" status (from the + * last flush run) could remain. Failure to write the file completely leave the + * old header in place, and other parts of the file changed. + * This would lead to index corruption. + */ + log_head.ilh_data_type = XT_DT_LOG_HEAD; + XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id); + XT_SET_DISK_4(log_head.ilh_log_eof_4, 0); + + if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread)) + return FAILED; + + if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread)) + return FAILED; + + return OK; } void XTIndexLog::il_close(xtBool delete_it) diff --git a/storage/pbxt/src/index_xt.h b/storage/pbxt/src/index_xt.h index ed4b9cef6ae..52f8f32dd33 100644 --- a/storage/pbxt/src/index_xt.h +++ b/storage/pbxt/src/index_xt.h @@ -30,6 +30,7 @@ #include <mysql_version.h> #include <my_bitmap.h> #endif +#include <time.h> #include "thread_xt.h" #include "linklist_xt.h" @@ -293,7 +294,6 @@ typedef struct XTIndFreeList { */ typedef struct XTIndex { u_int mi_index_no; /* The index number (used by MySQL). */ - xt_mutex_type mi_flush_lock; /* Lock the index during flushing. */ /* Protected by the mi_rwlock lock: */ XT_INDEX_LOCK_TYPE mi_rwlock; /* This lock protects the structure of the index. @@ -407,7 +407,7 @@ typedef struct XTIndexLog { off_t il_buffer_offset; - void il_reset(xtTableID tab_id); + xtBool il_reset(XTOpenTable *ot); void il_close(xtBool delete_it); void il_release(); @@ -478,7 +478,7 @@ xtBool xt_idx_search_prev(struct XTOpenTable *ot, struct XTIndex *ind, register xtBool xt_idx_next(register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key); xtBool xt_idx_prev(register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key); xtBool xt_idx_read(struct XTOpenTable *ot, struct XTIndex *ind, xtWord1 *rec_buf); -void xt_ind_set_index_selectivity(XTThreadPtr self, struct XTOpenTable *ot); +void xt_ind_set_index_selectivity(struct XTOpenTable *ot, XTThreadPtr thread); void xt_check_indices(struct XTOpenTable *ot); void xt_load_indices(XTThreadPtr self, struct XTOpenTable *ot); void xt_ind_count_deleted_items(struct XTTable *ot, struct XTIndex *ind, struct XTIndBlock *block); diff --git a/storage/pbxt/src/lock_xt.cc b/storage/pbxt/src/lock_xt.cc index 4513de17c5e..0e9af277c7b 100644 --- a/storage/pbxt/src/lock_xt.cc +++ b/storage/pbxt/src/lock_xt.cc @@ -1444,7 +1444,7 @@ xtPublic void xt_spinxslock_free(struct XTThread *XT_UNUSED(self), XTSpinXSLockP #endif } -xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID XT_NDEBUG_UNUSED(thd_id)) +xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thd_id)) { register xtWord2 set; @@ -1453,6 +1453,8 @@ xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID XT_NDEBUG_UN set = xt_atomic_tas2(&sxs->sxs_xlocked, 1); if (!set) break; + if (try_lock) + return FALSE; xt_yield(); } @@ -1460,9 +1462,25 @@ xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID XT_NDEBUG_UN sxs->sxs_locker = thd_id; #endif - /* Wait for all the reader to wait! */ - while (sxs->sxs_wait_count < sxs->sxs_rlock_count) - xt_yield(); + /* Wait for all the readers to wait! */ + while (sxs->sxs_wait_count < sxs->sxs_rlock_count) { + sxs->sxs_xwaiter = 1; + xt_yield(); //* + /* This should not be required, because there is only one thread + * accessing this value. However, the lock fails if this + * is not done with an atomic op. + * + * This is because threads on other processors have the + * value in processor cache. So they do not + * notice that the value has been set to zero. + * They think it is still 1 and march through + * the barrier (sxs->sxs_xwaiter < sxs->sxs_xlocked) below. + * + * In the meantime, this X locker has gone on thinking + * all is OK. + */ + xt_atomic_tas2(&sxs->sxs_xwaiter, 0); + } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&sxs->sxs_lock_info); @@ -1474,12 +1492,12 @@ xtPublic xtBool xt_spinxslock_slock(XTSpinXSLockPtr sxs) { xt_atomic_inc2(&sxs->sxs_rlock_count); - /* Check if there could be an X locker: */ - if (sxs->sxs_xlocked) { - /* I am waiting... */ + /* Wait as long as the locker is not waiting: */ + while (sxs->sxs_xwaiter < sxs->sxs_xlocked) { xt_atomic_inc2(&sxs->sxs_wait_count); - while (sxs->sxs_xlocked) + while (sxs->sxs_xwaiter < sxs->sxs_xlocked) { xt_yield(); + } xt_atomic_dec2(&sxs->sxs_wait_count); } @@ -1493,12 +1511,17 @@ xtPublic xtBool xt_spinxslock_unlock(XTSpinXSLockPtr sxs, xtBool xlocked) { if (xlocked) { #ifdef DEBUG + ASSERT_NS(sxs->sxs_locker && sxs->sxs_xlocked); sxs->sxs_locker = 0; #endif sxs->sxs_xlocked = 0; } - else + else { +#ifdef DEBUG + ASSERT_NS(sxs->sxs_rlock_count > 0); +#endif xt_atomic_dec2(&sxs->sxs_rlock_count); + } #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_release_owner(&sxs->sxs_lock_info); @@ -1698,7 +1721,7 @@ xtPublic void xt_atomicrwlock_free(struct XTThread *, XTAtomicRWLockPtr XT_UNUSE #endif } -xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtThreadID XT_NDEBUG_UNUSED(thr_id)) +xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thr_id)) { register xtWord2 set; @@ -1707,6 +1730,8 @@ xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtThreadID XT_NDEBU set = xt_atomic_tas2(&arw->arw_xlock_set, 1); if (!set) break; + if (try_lock) + return FALSE; xt_yield(); } @@ -1721,7 +1746,7 @@ xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtThreadID XT_NDEBU #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&arw->arw_lock_info); #endif - return OK; + return TRUE; } xtPublic xtBool xt_atomicrwlock_slock(XTAtomicRWLockPtr arw) @@ -1799,7 +1824,7 @@ xtPublic void xt_skewrwlock_free(struct XTThread *, XTSkewRWLockPtr XT_UNUSED(sr #endif } -xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtThreadID XT_NDEBUG_UNUSED(thr_id)) +xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thr_id)) { register xtWord2 set; @@ -1808,6 +1833,8 @@ xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtThreadID XT_NDEBUG_UN set = xt_atomic_tas2(&srw->srw_xlock_set, 1); if (!set) break; + if (try_lock) + return FALSE; xt_yield(); } @@ -1822,7 +1849,7 @@ xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtThreadID XT_NDEBUG_UN #ifdef XT_THREAD_LOCK_INFO xt_thread_lock_info_add_owner(&srw->srw_lock_info); #endif - return OK; + return TRUE; } xtPublic xtBool xt_skewrwlock_slock(XTSkewRWLockPtr srw) @@ -1869,6 +1896,124 @@ xtPublic xtBool xt_skewrwlock_unlock(XTSkewRWLockPtr srw, xtBool xlocked) /* * ----------------------------------------------------------------------- + * RECURSIVE R/W LOCK (allows X lockers to lock again) + */ + +#ifdef XT_THREAD_LOCK_INFO +void xt_recursivemutex_init(XTThreadPtr self, XTRecursiveMutexPtr rm, const char *name) +{ + rm->rm_locker = NULL; + rm->rm_lock_count = 0; + xt_init_mutex(self, &rm->rm_mutex, name); +} +#else +xtPublic void xt_recursivemutex_init(XTThreadPtr self, XTRecursiveMutexPtr rm) +{ + rm->rm_locker = NULL; + rm->rm_lock_count = 0; + xt_init_mutex(self, &rm->rm_mutex); +} +#endif + +xtPublic void xt_recursivemutex_free(XTRecursiveMutexPtr rm) +{ + xt_free_mutex(&rm->rm_mutex); +#ifdef XT_THREAD_LOCK_INFO + xt_thread_lock_info_free(&rm->rm_lock_info); +#endif +} + +xtPublic void xt_recursivemutex_lock(XTThreadPtr self, XTRecursiveMutexPtr rm) +{ + if (self != rm->rm_locker) { + xt_lock_mutex(self, &rm->rm_mutex); + rm->rm_locker = self; + } + rm->rm_lock_count++; +} + +xtPublic void xt_recursivemutex_unlock(XTThreadPtr self, XTRecursiveMutexPtr rm) +{ + ASSERT(self == rm->rm_locker); + ASSERT(rm->rm_lock_count > 0); + rm->rm_lock_count--; + if (!rm->rm_lock_count) { + rm->rm_locker = NULL; + xt_unlock_mutex(self, &rm->rm_mutex); + } +} + +/* + * ----------------------------------------------------------------------- + * RECURSIVE MUTEX (allows lockers to lock again) + */ + +#ifdef XT_THREAD_LOCK_INFO +void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw, const char *name) +{ + rrw->rrw_locker = NULL; + rrw->rrw_lock_count = 0; + xt_init_rwlock(self, &rrw->rrw_lock, name); +} +#else +void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw) +{ + rrw->rrw_locker = NULL; + rrw->rrw_lock_count = 0; + xt_init_rwlock(self, &rrw->rrw_lock); +} +#endif + +void xt_recurrwlock_free(XTRecurRWLockPtr rrw) +{ + xt_free_rwlock(&rrw->rrw_lock); +#ifdef XT_THREAD_LOCK_INFO + xt_thread_lock_info_free(&rrw->rrw_lock_info); +#endif +} + +void xt_recurrwlock_xlock(struct XTThread *self, XTRecurRWLockPtr rrw) +{ + if (self != rrw->rrw_locker) { + xt_xlock_rwlock(self, &rrw->rrw_lock); + rrw->rrw_locker = self; + } + rrw->rrw_lock_count++; +} + +void xt_recurrwlock_slock(struct XTThread *self, XTRecurRWLockPtr rrw) +{ + xt_slock_rwlock(self, &rrw->rrw_lock); +} + +void xt_recurrwlock_slock_ns(XTRecurRWLockPtr rrw) +{ + xt_slock_rwlock_ns(&rrw->rrw_lock); +} + +void xt_recurrwlock_unxlock(struct XTThread *self, XTRecurRWLockPtr rrw) +{ + ASSERT(self == rrw->rrw_locker); + ASSERT(rrw->rrw_lock_count > 0); + rrw->rrw_lock_count--; + if (!rrw->rrw_lock_count) { + rrw->rrw_locker = NULL; + xt_unlock_rwlock(self, &rrw->rrw_lock); + } +} + +void xt_recurrwlock_unslock(struct XTThread *self, XTRecurRWLockPtr rrw) +{ + xt_unlock_rwlock(self, &rrw->rrw_lock); +} + +void xt_recurrwlock_unslock_ns(XTRecurRWLockPtr rrw) +{ + xt_unlock_rwlock_ns(&rrw->rrw_lock); +} + +/* + * ----------------------------------------------------------------------- * UNIT TESTS */ @@ -2031,7 +2176,7 @@ static void *lck_run_writer(XTThreadPtr self) xt_rwmutex_unlock(&data->xs_lock, self->t_id); } else if (data->xs_which_lock == LOCK_SPINXSLOCK) { - xt_spinxslock_xlock(&data->xs_spinrwlock, self->t_id); + xt_spinxslock_xlock(&data->xs_spinrwlock, FALSE, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_spinxslock_unlock(&data->xs_spinrwlock, TRUE); } @@ -2041,12 +2186,12 @@ static void *lck_run_writer(XTThreadPtr self) xt_xsmutex_unlock(&data->xs_fastrwlock, self->t_id); } else if (data->xs_which_lock == LOCK_ATOMICRWLOCK) { - xt_atomicrwlock_xlock(&data->xs_atomicrwlock, self->t_id); + xt_atomicrwlock_xlock(&data->xs_atomicrwlock, FALSE, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_atomicrwlock_unlock(&data->xs_atomicrwlock, TRUE); } else if (data->xs_which_lock == LOCK_SKEWRWLOCK) { - xt_skewrwlock_xlock(&data->xs_skewrwlock, self->t_id); + xt_skewrwlock_xlock(&data->xs_skewrwlock, FALSE, self->t_id); lck_do_job(self, data->xs_which_job, data, FALSE); xt_skewrwlock_unlock(&data->xs_skewrwlock, TRUE); } diff --git a/storage/pbxt/src/lock_xt.h b/storage/pbxt/src/lock_xt.h index 05ba9af244e..4e5af648c37 100644 --- a/storage/pbxt/src/lock_xt.h +++ b/storage/pbxt/src/lock_xt.h @@ -109,7 +109,8 @@ inline xtWord1 xt_atomic_dec1(volatile xtWord1 *mptr) inline void xt_atomic_inc2(volatile xtWord2 *mptr) { #ifdef XT_ATOMIC_WIN32_X86 - __asm LOCK INC WORD PTR mptr + __asm MOV ECX, mptr + __asm LOCK INC WORD PTR [ECX] #elif defined(XT_ATOMIC_GNUC_X86) asm volatile ("lock; incw %0" : : "m" (*mptr) : "memory"); #elif defined(XT_ATOMIC_GCC_OPS) @@ -125,7 +126,8 @@ inline void xt_atomic_inc2(volatile xtWord2 *mptr) inline void xt_atomic_dec2(volatile xtWord2 *mptr) { #ifdef XT_ATOMIC_WIN32_X86 - __asm LOCK DEC WORD PTR mptr + __asm MOV ECX, mptr + __asm LOCK DEC WORD PTR [ECX] #elif defined(XT_ATOMIC_GNUC_X86) asm volatile ("lock; decw %0" : : "m" (*mptr) : "memory"); #elif defined(XT_ATOMIC_GCC_OPS) @@ -427,6 +429,7 @@ inline void xt_fastlock_unlock(XTFastLockPtr fal, struct XTThread *XT_UNUSED(thr typedef struct XTSpinXSLock { volatile xtWord2 sxs_xlocked; + volatile xtWord2 sxs_xwaiter; volatile xtWord2 sxs_rlock_count; volatile xtWord2 sxs_wait_count; /* The number of readers waiting for the xlocker. */ #ifdef DEBUG @@ -446,7 +449,7 @@ void xt_spinxslock_init(struct XTThread *self, XTSpinXSLockPtr sxs, const char * void xt_spinxslock_init(struct XTThread *self, XTSpinXSLockPtr sxs); #endif void xt_spinxslock_free(struct XTThread *self, XTSpinXSLockPtr sxs); -xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID thd_id); +xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtBool try_lock, xtThreadID thd_id); xtBool xt_spinxslock_slock(XTSpinXSLockPtr sxs); xtBool xt_spinxslock_unlock(XTSpinXSLockPtr sxs, xtBool xlocked); @@ -500,7 +503,7 @@ void xt_atomicrwlock_init(struct XTThread *self, XTAtomicRWLockPtr xsl, const ch void xt_atomicrwlock_init(struct XTThread *self, XTAtomicRWLockPtr xsl); #endif void xt_atomicrwlock_free(struct XTThread *self, XTAtomicRWLockPtr xsl); -xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr xsl, xtThreadID thr_id); +xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr xsl, xtBool try_lock, xtThreadID thr_id); xtBool xt_atomicrwlock_slock(XTAtomicRWLockPtr xsl); xtBool xt_atomicrwlock_unlock(XTAtomicRWLockPtr xsl, xtBool xlocked); @@ -525,7 +528,7 @@ void xt_skewrwlock_init(struct XTThread *self, XTSkewRWLockPtr xsl, const char * void xt_skewrwlock_init(struct XTThread *self, XTSkewRWLockPtr xsl); #endif void xt_skewrwlock_free(struct XTThread *self, XTSkewRWLockPtr xsl); -xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr xsl, xtThreadID thr_id); +xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr xsl, xtBool try_lock, xtThreadID thr_id); xtBool xt_skewrwlock_slock(XTSkewRWLockPtr xsl); xtBool xt_skewrwlock_unlock(XTSkewRWLockPtr xsl, xtBool xlocked); @@ -713,4 +716,57 @@ void xt_exit_row_lock_list(XTRowLockListPtr rl); #define XT_HAVE_LOCK 2 #define XT_WAITING 3 +/* + * ----------------------------------------------------------------------- + * RECURSIVE MUTEX (allows lockers to lock again) + */ + +typedef struct XTRecursiveMutex { + struct XTThread *rm_locker; + u_int rm_lock_count; + xt_mutex_type rm_mutex; + +#ifdef XT_THREAD_LOCK_INFO + XTThreadLockInfoRec rm_lock_info; + const char *rm_name; +#endif +} XTRecursiveMutexRec, *XTRecursiveMutexPtr; + +#ifdef XT_THREAD_LOCK_INFO +#define xt_recursivemutex_init_with_autoname(a,b) xt_recursivemutex_init(a,b,LOCKLIST_ARG_SUFFIX(b)) +void xt_recursivemutex_init(struct XTThread *self, XTRecursiveMutexPtr rm, const char *name); +#else +#define xt_recursivemutex_init_with_autoname(a,b) xt_recursivemutex_init(a,b) +void xt_recursivemutex_init(struct XTThread *self, XTRecursiveMutexPtr rm); +#endif +void xt_recursivemutex_free(XTRecursiveMutexPtr rm); +void xt_recursivemutex_lock(struct XTThread *self, XTRecursiveMutexPtr rm); +void xt_recursivemutex_unlock(struct XTThread *self, XTRecursiveMutexPtr rm); + +typedef struct XTRecurRWLock { + struct XTThread *rrw_locker; + u_int rrw_lock_count; + xt_rwlock_type rrw_lock; + +#ifdef XT_THREAD_LOCK_INFO + XTThreadLockInfoRec rrw_lock_info; + const char *rrw_name; +#endif +} XTRecurRWLockRec, *XTRecurRWLockPtr; + +#ifdef XT_THREAD_LOCK_INFO +#define xt_recurrwlock_init_with_autoname(a,b) xt_recurrwlock_init(a,b,LOCKLIST_ARG_SUFFIX(b)) +void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw, const char *name); +#else +#define xt_recurrwlock_init_with_autoname(a,b) xt_recurrwlock_init(a,b) +void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw); +#endif +void xt_recurrwlock_free(XTRecurRWLockPtr rrw); +void xt_recurrwlock_xlock(struct XTThread *self, XTRecurRWLockPtr rrw); +void xt_recurrwlock_slock(struct XTThread *self, XTRecurRWLockPtr rrw); +void xt_recurrwlock_slock_ns(XTRecurRWLockPtr rrw); +void xt_recurrwlock_unxlock(struct XTThread *self, XTRecurRWLockPtr rrw); +void xt_recurrwlock_unslock(struct XTThread *self, XTRecurRWLockPtr rrw); +void xt_recurrwlock_unslock_ns(XTRecurRWLockPtr rrw); + #endif diff --git a/storage/pbxt/src/locklist_xt.cc b/storage/pbxt/src/locklist_xt.cc index cbb004a70ca..9f79442560b 100644 --- a/storage/pbxt/src/locklist_xt.cc +++ b/storage/pbxt/src/locklist_xt.cc @@ -180,6 +180,10 @@ void xt_trace_thread_locks(XTThread *self) lock_type = "XTAtomicRWLock"; lock_name = li->li_atomic_rwlock->arw_name; break; + case XTThreadLockInfo::SKEW_RW_LOCK: + lock_type = "XTSkewRWLock"; + lock_name = li->li_skew_rwlock->srw_name; + break; } xt_ttracef(self, " #lock#%d: type: %s name: %s \n", count, lock_type, lock_name); diff --git a/storage/pbxt/src/myxt_xt.cc b/storage/pbxt/src/myxt_xt.cc index 2b5d59e72fb..51490fc00f5 100644 --- a/storage/pbxt/src/myxt_xt.cc +++ b/storage/pbxt/src/myxt_xt.cc @@ -54,6 +54,7 @@ extern pthread_key_t THR_Session; #include "database_xt.h" #include "cache_xt.h" #include "datalog_xt.h" +#include "memory_xt.h" static void myxt_bitmap_init(XTThreadPtr self, MX_BITMAP *map, u_int n_bits); static void myxt_bitmap_free(XTThreadPtr self, MX_BITMAP *map); @@ -255,6 +256,11 @@ xtPublic u_int myxt_create_key_from_row(XTIndexPtr ind, xtWord1 *key, xtWord1 *r xtWord1 *end; xtWord1 *start; +#ifdef HAVE_valgrind + if (ind->mi_fix_key) + memset((byte*) key, 0,(size_t) (ind->mi_key_size) ); +#endif + start = key; for (u_int i=0; i<ind->mi_seg_count; i++, keyseg++) { @@ -531,7 +537,7 @@ xtPublic u_int myxt_create_foreign_key_from_row(XTIndexPtr ind, xtWord1 *key, xt key += length; } - return fkey_ind->mi_fix_key ? fkey_ind->mi_key_size : (u_int) (key - start); /* Return keylength */ + return (u_int) (key - start); } /* I may be overcautious here, but can I assume that @@ -2132,10 +2138,10 @@ static void my_deref_index_data(struct XTThread *self, XTIndexPtr mi) { enter_(); /* The dirty list of cache pages should be empty here! */ - ASSERT(!mi->mi_dirty_list); + /* This is not the case if we were not able to flush data. E.g. when running out of disk space */ + //ASSERT(!mi->mi_dirty_list); ASSERT(!mi->mi_free_list); - xt_free_mutex(&mi->mi_flush_lock); xt_spinlock_free(self, &mi->mi_dirty_lock); XT_INDEX_FREE_LOCK(self, mi); myxt_bitmap_free(self, &mi->mi_col_map); @@ -2174,7 +2180,6 @@ static XTIndexPtr my_create_index(XTThreadPtr self, TABLE *table_arg, u_int idx, pushsr_(ind, my_deref_index_data, (XTIndexPtr) xt_calloc(self, MX_OFFSETOF(XTIndexRec, mi_seg) + sizeof(XTIndexSegRec) * index->key_parts)); XT_INDEX_INIT_LOCK(self, ind); - xt_init_mutex_with_autoname(self, &ind->mi_flush_lock); xt_spinlock_init_with_autoname(self, &ind->mi_dirty_lock); ind->mi_index_no = idx; ind->mi_flags = (index->flags & (HA_NOSAME | HA_NULL_ARE_EQUAL | HA_UNIQUE_CHECK)); @@ -2556,8 +2561,12 @@ xtPublic void myxt_setup_dictionary(XTThreadPtr self, XTDictionaryPtr dic) ave_row_size += 3 + ave_data_size; /* This is the length of the record required for all indexes: */ - if (field_count + 1 == dic->dic_ind_cols_req) - dic->dic_ind_rec_len = max_data_size; + /* This was calculated incorrectly. Not a serius bug because it + * is only used in the case of fixed length row, and in this + * case the dic_ind_rec_len is set correctly below. + */ + if (field_count == dic->dic_ind_cols_req) + dic->dic_ind_rec_len = max_row_size; } dic->dic_min_row_size = min_row_size; @@ -2624,6 +2633,20 @@ xtPublic void myxt_setup_dictionary(XTThreadPtr self, XTDictionaryPtr dic) } } + /* Ensure that handle data record size is big enough to + * include the extended record reference, in the case of + * variable length rows + */ + if (!dic_rec_fixed) { + if (dic_rec_size < offsetof(XTTabRecExtDRec, re_data)) + dic_rec_size = offsetof(XTTabRecExtDRec, re_data); + } +#ifdef DEBUG + else { + ASSERT_NS(dic_rec_size > offsetof(XTTabRecFix, rf_data)); + } +#endif + if (!dic->dic_rec_size) { dic->dic_rec_size = dic_rec_size; dic->dic_rec_fixed = dic_rec_fixed; @@ -2861,6 +2884,7 @@ static void ha_create_dd_index(XTThreadPtr self, XTDDIndex *ind, KEY *key) for (key_part = key->key_part; key_part != key_part_end; key_part++) { if (!(cref = new XTDDColumnRef())) xt_throw_errno(XT_CONTEXT, XT_ENOMEM); + cref->init(self); ind->co_cols.append(self, cref); cref->cr_col_name = xt_dup_string(self, (char *) key_part->field->field_name); } diff --git a/storage/pbxt/src/pthread_xt.cc b/storage/pbxt/src/pthread_xt.cc index 2dfb8a64af2..e7f0632e9ae 100755 --- a/storage/pbxt/src/pthread_xt.cc +++ b/storage/pbxt/src/pthread_xt.cc @@ -39,25 +39,25 @@ #ifdef XT_WIN -void xt_p_init_threading(void) +xtPublic void xt_p_init_threading(void) { } -int xt_p_set_normal_priority(pthread_t thr) +xtPublic int xt_p_set_normal_priority(pthread_t thr) { if (!SetThreadPriority (thr, THREAD_PRIORITY_NORMAL)) return GetLastError(); return 0; } -int xt_p_set_low_priority(pthread_t thr) +xtPublic int xt_p_set_low_priority(pthread_t thr) { if (!SetThreadPriority (thr, THREAD_PRIORITY_LOWEST)) return GetLastError(); return 0; } -int xt_p_set_high_priority(pthread_t thr) +xtPublic int xt_p_set_high_priority(pthread_t thr) { if (!SetThreadPriority (thr, THREAD_PRIORITY_HIGHEST)) return GetLastError(); @@ -67,9 +67,9 @@ int xt_p_set_high_priority(pthread_t thr) #define XT_RWLOCK_MAGIC 0x78AC390E #ifdef XT_THREAD_LOCK_INFO -int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr, const char *n) +xtPublic int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr, const char *n) #else -int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr) +xtPublic int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr) #endif { InitializeCriticalSection(&mutex->mt_cs); @@ -80,7 +80,7 @@ int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr) return 0; } -int xt_p_mutex_destroy(xt_mutex_type *mutex) +xtPublic int xt_p_mutex_destroy(xt_mutex_type *mutex) { DeleteCriticalSection(&mutex->mt_cs); #ifdef XT_THREAD_LOCK_INFO @@ -89,7 +89,7 @@ int xt_p_mutex_destroy(xt_mutex_type *mutex) return 0; } -int xt_p_mutex_lock(xt_mutex_type *mx) +xtPublic int xt_p_mutex_lock(xt_mutex_type *mx) { EnterCriticalSection(&mx->mt_cs); #ifdef XT_THREAD_LOCK_INFO @@ -98,7 +98,7 @@ int xt_p_mutex_lock(xt_mutex_type *mx) return 0; } -int xt_p_mutex_unlock(xt_mutex_type *mx) +xtPublic int xt_p_mutex_unlock(xt_mutex_type *mx) { LeaveCriticalSection(&mx->mt_cs); #ifdef XT_THREAD_LOCK_INFO @@ -107,7 +107,7 @@ int xt_p_mutex_unlock(xt_mutex_type *mx) return 0; } -int xt_p_mutex_trylock(xt_mutex_type *mutex) +xtPublic int xt_p_mutex_trylock(xt_mutex_type *mutex) { #if(_WIN32_WINNT >= 0x0400) /* NOTE: MySQL bug! was using?! @@ -130,9 +130,9 @@ int xt_p_mutex_trylock(xt_mutex_type *mutex) } #ifdef XT_THREAD_LOCK_INFO -int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr, const char *n) +xtPublic int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr, const char *n) #else -int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr) +xtPublic int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr) #endif { int result; @@ -173,7 +173,7 @@ int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr) return result; } -int xt_p_rwlock_destroy(xt_rwlock_type *rwl) +xtPublic int xt_p_rwlock_destroy(xt_rwlock_type *rwl) { int result = 0, result1 = 0, result2 = 0; @@ -225,7 +225,7 @@ int xt_p_rwlock_destroy(xt_rwlock_type *rwl) } -int xt_p_rwlock_rdlock(xt_rwlock_type *rwl) +xtPublic int xt_p_rwlock_rdlock(xt_rwlock_type *rwl) { int result; @@ -262,7 +262,7 @@ int xt_p_rwlock_rdlock(xt_rwlock_type *rwl) return (xt_p_mutex_unlock (&(rwl->rw_ex_lock))); } -int xt_p_rwlock_wrlock(xt_rwlock_type *rwl) +xtPublic int xt_p_rwlock_wrlock(xt_rwlock_type *rwl) { int result; @@ -309,7 +309,54 @@ int xt_p_rwlock_wrlock(xt_rwlock_type *rwl) return result; } -int xt_p_rwlock_unlock(xt_rwlock_type *rwl) +xtPublic xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *rwl) +{ + int result; + + if (rwl == NULL) + return FALSE; + + if (rwl->rw_magic != XT_RWLOCK_MAGIC) + return FALSE; + + if ((result = xt_p_mutex_trylock(&rwl->rw_ex_lock)) != 0) + return FALSE; + + if ((result = xt_p_mutex_lock(&rwl->rw_sh_lock)) != 0) { + (void) xt_p_mutex_unlock(&rwl->rw_ex_lock); + return FALSE; + } + + if (rwl->rw_ex_count == 0) { + if (rwl->rw_sh_complete_count > 0) { + rwl->rw_sh_count -= rwl->rw_sh_complete_count; + rwl->rw_sh_complete_count = 0; + } + + if (rwl->rw_sh_count > 0) { + rwl->rw_sh_complete_count = -rwl->rw_sh_count; + + do { + result = pthread_cond_wait (&rwl->rw_sh_cond, &rwl->rw_sh_lock.mt_cs); + } + while (result == 0 && rwl->rw_sh_complete_count < 0); + + if (result == 0) + rwl->rw_sh_count = 0; + } + } + + if (result == 0) + rwl->rw_ex_count++; + +#ifdef XT_THREAD_LOCK_INFO + xt_thread_lock_info_add_owner(&rwl->rw_lock_info); +#endif + + return TRUE; +} + +xtPublic int xt_p_rwlock_unlock(xt_rwlock_type *rwl) { int result, result1; @@ -342,12 +389,12 @@ int xt_p_rwlock_unlock(xt_rwlock_type *rwl) return ((result != 0) ? result : result1); } -int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex) +xtPublic int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex) { return xt_p_cond_timedwait(cond, mutex, NULL); } -int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec *abstime) +xtPublic int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec *abstime) { pthread_mutex_t *mutex = &mt->mt_cs; int result; @@ -393,7 +440,7 @@ int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec * return result == WAIT_TIMEOUT ? ETIMEDOUT : 0; } -int xt_p_join(pthread_t thread, void **value) +xtPublic int xt_p_join(pthread_t thread, void **value) { DWORD exitcode; @@ -676,6 +723,23 @@ xtPublic int xt_p_rwlock_wrlock(xt_rwlock_type *rwlock) return r; } +xtPublic xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *rwlock) +{ + XTThreadPtr self = xt_get_self(); + int r; + + ASSERT_NS(rwlock->rw_init == 67890); + r = pthread_rwlock_trywrlock(&rwlock->rw_plock); + if (r == 0) { + ASSERT_NS(!rwlock->rw_locker); + rwlock->rw_locker = self; +#ifdef XT_THREAD_LOCK_INFO + xt_thread_lock_info_add_owner(&rwlock->rw_lock_info); +#endif + } + return r == 0; +} + xtPublic int xt_p_rwlock_unlock(xt_rwlock_type *rwlock) { XTThreadPtr self = xt_get_self(); diff --git a/storage/pbxt/src/pthread_xt.h b/storage/pbxt/src/pthread_xt.h index d8ef1a85d41..dccc5779aad 100755 --- a/storage/pbxt/src/pthread_xt.h +++ b/storage/pbxt/src/pthread_xt.h @@ -101,13 +101,14 @@ int xt_p_rwlock_init(xt_rwlock_type *rwlock, const pthread_condattr_t *attr, con #else int xt_p_rwlock_init(xt_rwlock_type *rwlock, const pthread_condattr_t *attr); #endif -int xt_p_rwlock_destroy(xt_rwlock_type *rwlock); -int xt_p_rwlock_rdlock(xt_rwlock_type *mx); -int xt_p_rwlock_wrlock(xt_rwlock_type *mx); -int xt_p_rwlock_unlock(xt_rwlock_type *mx); +int xt_p_rwlock_destroy(xt_rwlock_type *rwlock); +int xt_p_rwlock_rdlock(xt_rwlock_type *mx); +int xt_p_rwlock_wrlock(xt_rwlock_type *mx); +xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *rwl); +int xt_p_rwlock_unlock(xt_rwlock_type *mx); -int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex); -int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mutex, struct timespec *abstime); +int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex); +int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mutex, struct timespec *abstime); int xt_p_join(pthread_t thread, void **value); @@ -125,6 +126,7 @@ int xt_p_join(pthread_t thread, void **value); #define xt_slock_rwlock_ns xt_p_rwlock_rdlock #define xt_xlock_rwlock_ns xt_p_rwlock_wrlock +#define xt_xlock_try_rwlock_ns xt_p_rwlock_try_wrlock #define xt_unlock_rwlock_ns xt_p_rwlock_unlock #ifdef XT_THREAD_LOCK_INFO @@ -225,9 +227,10 @@ typedef struct xt_rwlock_struct { #endif } xt_rwlock_type; -int xt_p_rwlock_rdlock(xt_rwlock_type *mx); -int xt_p_rwlock_wrlock(xt_rwlock_type *mx); -int xt_p_rwlock_unlock(xt_rwlock_type *mx); +int xt_p_rwlock_rdlock(xt_rwlock_type *mx); +int xt_p_rwlock_wrlock(xt_rwlock_type *mx); +xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *mx); +int xt_p_rwlock_unlock(xt_rwlock_type *mx); int xt_p_mutex_lock(xt_mutex_type *mx, u_int line, const char *file); int xt_p_mutex_unlock(xt_mutex_type *mx); @@ -251,37 +254,39 @@ int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mutex, const struct t } #endif -#define xt_slock_rwlock_ns xt_p_rwlock_rdlock -#define xt_xlock_rwlock_ns xt_p_rwlock_wrlock -#define xt_unlock_rwlock_ns xt_p_rwlock_unlock +#define xt_slock_rwlock_ns xt_p_rwlock_rdlock +#define xt_xlock_rwlock_ns xt_p_rwlock_wrlock +#define xt_xlock_try_rwlock_ns xt_p_rwlock_try_wrlock +#define xt_unlock_rwlock_ns xt_p_rwlock_unlock -#define xt_lock_mutex_ns(x) xt_p_mutex_lock(x, __LINE__, __FILE__) -#define xt_unlock_mutex_ns xt_p_mutex_unlock -#define xt_mutex_trylock xt_p_mutex_trylock +#define xt_lock_mutex_ns(x) xt_p_mutex_lock(x, __LINE__, __FILE__) +#define xt_unlock_mutex_ns xt_p_mutex_unlock +#define xt_mutex_trylock xt_p_mutex_trylock #else // DEBUG_LOCKING -#define xt_rwlock_struct _opaque_pthread_rwlock_t -#define xt_mutex_struct _opaque_pthread_mutex_t +#define xt_rwlock_struct _opaque_pthread_rwlock_t +#define xt_mutex_struct _opaque_pthread_mutex_t -#define xt_rwlock_type pthread_rwlock_t -#define xt_mutex_type pthread_mutex_t +#define xt_rwlock_type pthread_rwlock_t +#define xt_mutex_type pthread_mutex_t -#define xt_slock_rwlock_ns pthread_rwlock_rdlock -#define xt_xlock_rwlock_ns pthread_rwlock_wrlock -#define xt_unlock_rwlock_ns pthread_rwlock_unlock +#define xt_slock_rwlock_ns pthread_rwlock_rdlock +#define xt_xlock_rwlock_ns pthread_rwlock_wrlock +#define xt_xlock_try_rwlock_ns(x) (pthread_rwlock_trywrlock(x) == 0) +#define xt_unlock_rwlock_ns pthread_rwlock_unlock -#define xt_lock_mutex_ns pthread_mutex_lock -#define xt_unlock_mutex_ns pthread_mutex_unlock -#define xt_mutex_trylock pthread_mutex_trylock +#define xt_lock_mutex_ns pthread_mutex_lock +#define xt_unlock_mutex_ns pthread_mutex_unlock +#define xt_mutex_trylock pthread_mutex_trylock -#define xt_p_mutex_trylock pthread_mutex_trylock -#define xt_p_mutex_destroy pthread_mutex_destroy -#define xt_p_mutex_init pthread_mutex_init -#define xt_p_rwlock_destroy pthread_rwlock_destroy -#define xt_p_rwlock_init pthread_rwlock_init -#define xt_p_cond_wait pthread_cond_wait -#define xt_p_cond_timedwait pthread_cond_timedwait +#define xt_p_mutex_trylock pthread_mutex_trylock +#define xt_p_mutex_destroy pthread_mutex_destroy +#define xt_p_mutex_init pthread_mutex_init +#define xt_p_rwlock_destroy pthread_rwlock_destroy +#define xt_p_rwlock_init pthread_rwlock_init +#define xt_p_cond_wait pthread_cond_wait +#define xt_p_cond_timedwait pthread_cond_timedwait #endif // DEBUG_LOCKING diff --git a/storage/pbxt/src/restart_xt.cc b/storage/pbxt/src/restart_xt.cc index 472c5ad0478..b0c8f2854ae 100644 --- a/storage/pbxt/src/restart_xt.cc +++ b/storage/pbxt/src/restart_xt.cc @@ -1315,7 +1315,7 @@ static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool tab->tab_head_op_seq = op->or_op_seq; if (tab->tab_wr_wake_freeer) { if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op)) - xt_wr_wake_freeer(self); + xt_wr_wake_freeer(self, ws->ws_db); } i++; } @@ -1498,7 +1498,7 @@ xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLo tab->tab_head_op_seq = op_seq; if (tab->tab_wr_wake_freeer) { if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op)) - xt_wr_wake_freeer(self); + xt_wr_wake_freeer(self, ws->ws_db); } /* Apply any operations in the list that now follow on... @@ -1575,10 +1575,12 @@ static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp) { xt_init_mutex_with_autoname(self, &cp->cp_state_lock); + cp->cp_inited = TRUE; } static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp) { + cp->cp_inited = FALSE; xt_free_mutex(&cp->cp_state_lock); if (cp->cp_table_ids) { xt_free_sortedlist(self, cp->cp_table_ids); @@ -1616,6 +1618,7 @@ xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db) xt_init_mutex_with_autoname(self, &db->db_cp_lock); xt_init_cond(self, &db->db_cp_cond); + xt_init_mutex_with_autoname(self, &db->db_fl_lock); xres_init_checkpoint_state(self, &db->db_cp_state); db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id); @@ -1633,6 +1636,7 @@ xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db) xres_free_checkpoint_state(self, &db->db_cp_state); xt_free_mutex(&db->db_cp_lock); xt_free_cond(&db->db_cp_cond); + xt_free_mutex(&db->db_fl_lock); } /* ---------------------------------------------------------------------- @@ -2182,7 +2186,7 @@ xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffse xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset) { - return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency / 2; + return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency; } /* @@ -2531,10 +2535,10 @@ static void xres_cp_main(XTThreadPtr self) XTDatabaseHPtr db = self->st_database; u_int curr_writer_total; time_t now; + xtXactID sweep_count; xt_set_low_priority(self); - while (!self->t_quit) { /* Wait 2 seconds: */ curr_writer_total = db->db_xn_total_writer_count; @@ -2549,9 +2553,13 @@ static void xres_cp_main(XTThreadPtr self) if (self->t_quit) break; - if (curr_writer_total == db->db_xn_total_writer_count) + sweep_count = db->db_xn_curr_id + 1 - db->db_xn_to_clean_id; + if (curr_writer_total == db->db_xn_total_writer_count && + !sweep_count && + db->db_wr_idle == XT_THREAD_IDLE) { /* No activity in 2 seconds: */ xres_cp_checkpoint(self, db, curr_writer_total, FALSE); + } else { /* There server is busy, check if we need to * write a checkpoint anyway... @@ -2672,6 +2680,10 @@ xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, X XTOperationPtr op; XTCheckPointTableRec cpt; XTSortedListPtr tables = NULL; + + /* during startup we can get an error before the checkpointer is inited */ + if (!cp->cp_inited) + return FAILED; /* First check if a checkpoint is already running: */ xt_lock_mutex_ns(&cp->cp_state_lock); @@ -3314,7 +3326,7 @@ static void *xn_xres_run_recovery_thread(XTThreadPtr self) * #7 0x000c0db2 in THD::~THD at sql_class.cc:934 * #8 0x003b025b in myxt_destroy_thread at myxt_xt.cc:2999 * #9 0x003b66b5 in xn_xres_run_recovery_thread at restart_xt.cc:3196 - * #10 0x003cbfbb in thr_main_pbxt at thread_xt.cc:1020 + * #10 0x003cbfbb in xt_thread_main at thread_xt.cc:1020 * myxt_destroy_thread(mysql_thread, TRUE); */ @@ -3350,3 +3362,123 @@ xtPublic void xt_xres_terminate_recovery(XTThreadPtr self) xt_wait_for_thread(tid, TRUE); } } + +/* ---------------------------------------------------------------------- + * L O G F L U S H P R O C E S S + */ + +static void *xres_fl_run_thread(XTThreadPtr self) +{ + XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data; + int count; + void *mysql_thread; + xtWord8 to_flush; + + if (!(mysql_thread = myxt_create_thread())) + xt_throw(self); + + while (!self->t_quit) { + try_(a) { + /* + * The garbage collector requires that the database + * is in use because. + */ + xt_use_database(self, db, XT_FOR_CHECKPOINTER); + + /* This action is both safe and required (see details elsewhere) */ + xt_heap_release(self, self->st_database); + + xt_set_low_priority(self); + + to_flush = xt_trace_clock() + XT_XLOG_FLUSH_FREQ * 1000; + for (;;) { + /* Wait 1 second: */ + while (!self->t_quit && xt_trace_clock() < to_flush) + xt_sleep_milli_second(10); + + if (self->t_quit) + break; + + if (!db->db_xlog.xlog_flush(self)) + xt_throw(self); + + to_flush += XT_XLOG_FLUSH_FREQ * 1000; + } + } + catch_(a) { + /* This error is "normal"! */ + if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY && + !(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT && + self->t_exception.e_sys_err == SIGTERM)) + xt_log_and_clear_exception(self); + } + cont_(a); + + /* Avoid releasing the database (done above) */ + self->st_database = NULL; + xt_unuse_database(self, self); + + /* After an exception, pause before trying again... */ + /* Number of seconds */ + count = 60; + while (!self->t_quit && count > 0) { + sleep(1); + count--; + } + } + + /* + * {MYSQL-THREAD-KILL} + myxt_destroy_thread(mysql_thread, TRUE); + */ + return NULL; +} + +static void xres_fl_free_thread(XTThreadPtr self, void *data) +{ + XTDatabaseHPtr db = (XTDatabaseHPtr) data; + + if (db->db_fl_thread) { + xt_lock_mutex(self, &db->db_fl_lock); + pushr_(xt_unlock_mutex, &db->db_fl_lock); + db->db_fl_thread = NULL; + freer_(); // xt_unlock_mutex(&db->db_fl_lock) + } +} + +xtPublic void xt_start_flusher(XTThreadPtr self, XTDatabaseHPtr db) +{ + char name[PATH_MAX]; + + sprintf(name, "FL-%s", xt_last_directory_of_path(db->db_main_path)); + xt_remove_dir_char(name); + db->db_fl_thread = xt_create_daemon(self, name); + xt_set_thread_data(db->db_fl_thread, db, xres_fl_free_thread); + xt_run_thread(self, db->db_fl_thread, xres_fl_run_thread); +} + +xtPublic void xt_stop_flusher(XTThreadPtr self, XTDatabaseHPtr db) +{ + XTThreadPtr thr_fl; + + if (db->db_fl_thread) { + xt_lock_mutex(self, &db->db_fl_lock); + pushr_(xt_unlock_mutex, &db->db_fl_lock); + + /* This pointer is safe as long as you have the transaction lock. */ + if ((thr_fl = db->db_fl_thread)) { + xtThreadID tid = thr_fl->t_id; + + /* Make sure the thread quits when woken up. */ + xt_terminate_thread(self, thr_fl); + + freer_(); // xt_unlock_mutex(&db->db_cp_lock) + + xt_wait_for_thread(tid, FALSE); + db->db_fl_thread = NULL; + } + else + freer_(); // xt_unlock_mutex(&db->db_cp_lock) + } +} + diff --git a/storage/pbxt/src/restart_xt.h b/storage/pbxt/src/restart_xt.h index 73128c4ded8..614fd74a39d 100644 --- a/storage/pbxt/src/restart_xt.h +++ b/storage/pbxt/src/restart_xt.h @@ -92,6 +92,7 @@ private: } XTXactRestartRec, *XTXactRestartPtr; typedef struct XTCheckPointState { + xtBool cp_inited; /* TRUE if structure was inited */ xt_mutex_type cp_state_lock; /* Lock and the entire checkpoint state. */ xtBool cp_running; /* TRUE if a checkpoint is running. */ xtLogID cp_log_id; @@ -136,6 +137,9 @@ void xt_dump_xlogs(struct XTDatabase *db, xtLogID start_log); void xt_xres_start_database_recovery(XTThreadPtr self); void xt_xres_terminate_recovery(XTThreadPtr self); +void xt_start_flusher(struct XTThread *self, struct XTDatabase *db); +void xt_stop_flusher(struct XTThread *self, struct XTDatabase *db); + #define XT_RECOVER_PENDING 0 #define XT_RECOVER_DONE 1 #define XT_RECOVER_SWEPT 2 diff --git a/storage/pbxt/src/strutil_xt.cc b/storage/pbxt/src/strutil_xt.cc index 5c3856de100..02132fbb06b 100644 --- a/storage/pbxt/src/strutil_xt.cc +++ b/storage/pbxt/src/strutil_xt.cc @@ -380,7 +380,7 @@ xtPublic void xt_int8_to_byte_size(xtInt8 value, char *string) /* Version number must also be set in configure.in! */ xtPublic c_char *xt_get_version(void) { - return "1.0.09g RC"; + return "1.0.11 Pre-GA"; } /* Copy and URL decode! */ diff --git a/storage/pbxt/src/tabcache_xt.cc b/storage/pbxt/src/tabcache_xt.cc index 9897525ad69..92958f2da49 100644 --- a/storage/pbxt/src/tabcache_xt.cc +++ b/storage/pbxt/src/tabcache_xt.cc @@ -46,8 +46,10 @@ static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs); xtPublic void xt_tc_set_cache_size(size_t cache_size) { xt_tab_cache.tcm_cache_size = cache_size; - xt_tab_cache.tcm_low_level = cache_size / 4 * 3; // Current 75% - xt_tab_cache.tcm_high_level = cache_size / 100 * 95; // Current 95% + /* Multiplying by this number can overflow a 4 byte value! */ + xt_tab_cache.tcm_low_level = (size_t) ((xtWord8) cache_size * (xtWord8) 70 / (xtWord8) 100); // Current 70% + xt_tab_cache.tcm_high_level = (size_t) ((xtWord8) cache_size * 95 / (xtWord8) 100); // Current 95% + xt_tab_cache.tcm_mid_level = (size_t) ((xtWord8) cache_size * 85 / (xtWord8) 100); // Current 85% } /* @@ -84,25 +86,30 @@ xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size) xtPublic void xt_tc_exit(XTThreadPtr self) { + XTTabCacheSegPtr seg; + for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) { - if (xt_tab_cache.tcm_segment[i].tcs_hash_table) { - if (xt_tab_cache.tcm_segment[i].tcs_cache_in_use) { - XTTabCachePagePtr page, tmp_page; - - for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) { - page = xt_tab_cache.tcm_segment[i].tcs_hash_table[j]; - while (page) { - tmp_page = page; - page = page->tcp_next; - xt_free(self, tmp_page); - } + seg = &xt_tab_cache.tcm_segment[i]; + if (seg->tcs_hash_table) { + XTTabCachePagePtr page, tmp_page; + + for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) { + page = seg->tcs_hash_table[j]; + while (page) { + tmp_page = page; + page = page->tcp_next; + ASSERT_NS(seg->tcs_cache_in_use >= offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size); + seg->tcs_cache_in_use -= (offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size); + ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000); + xt_free(self, tmp_page); } } - xt_free(self, xt_tab_cache.tcm_segment[i].tcs_hash_table); - xt_tab_cache.tcm_segment[i].tcs_hash_table = NULL; - TAB_CAC_FREE_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock); + xt_free(self, seg->tcs_hash_table); + seg->tcs_hash_table = NULL; + TAB_CAC_FREE_LOCK(self, &seg->tcs_lock); } + ASSERT_NS(seg->tcs_cache_in_use == 0); } xt_free_mutex(&xt_tab_cache.tcm_lock); @@ -554,24 +561,24 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache } page = page->tcp_next; } + + size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size; + TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id); /* Page not found, allocate a new page: */ - size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size; if (!(new_page = (XTTabCachePagePtr) xt_malloc_ns(page_size))) return FAILED; - /* Increment cache used. */ - seg->tcs_cache_in_use += page_size; /* Check the level of the cache: */ size_t cache_used = 0; for (int i=0; i<XT_TC_SEGMENT_COUNT; i++) cache_used += dcg->tcm_segment[i].tcs_cache_in_use; - if (cache_used > dcg->tcm_cache_high) + if (cache_used + page_size > dcg->tcm_cache_high) dcg->tcm_cache_high = cache_used; - if (cache_used > dcg->tcm_cache_size) { + if (cache_used + page_size > dcg->tcm_cache_size) { XTThreadPtr self; time_t now; @@ -638,7 +645,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache for (int i=0; i<XT_TC_SEGMENT_COUNT; i++) cache_used += dcg->tcm_segment[i].tcs_cache_in_use; - if (cache_used <= dcg->tcm_high_level) + if (cache_used + page_size <= dcg->tcm_high_level) break; /* * If there is too little cache we can get stuck here. @@ -663,7 +670,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache while (time(NULL) < now + 5); xt_unlock_mutex_ns(&dcg->tcm_freeer_lock); } - else if (cache_used > dcg->tcm_high_level) { + else if (cache_used + page_size > dcg->tcm_high_level) { /* Wake up the freeer because the cache level, * is higher than the high level. */ @@ -693,6 +700,9 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache } #ifdef XT_MEMSET_UNUSED_SPACE + else + red_size = 0; + /* Removing this is an optimization. It should not be required * to clear the unused space in the page. */ @@ -727,6 +737,15 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache page->tcp_next = seg->tcs_hash_table[hash_idx]; seg->tcs_hash_table[hash_idx] = page; + /* GOTCHA! This increment was done just after the malloc! + * So it was not protected by the segment lock! + * The result was that this count was no longer reliable, + * This resulted in the amount of cache being used becoming less, and\ + * less, because increments were lost over time! + */ + /* Increment cache used. */ + seg->tcs_cache_in_use += page_size; + done_ok: *ret_seg = seg; *ret_page = page; @@ -761,7 +780,7 @@ xtBool XTTableSeq::ts_log_no_op(XTThreadPtr thread, xtTableID tab_id, xtOpSeqNo * some will be missing, so the writer will not * be able to contniue. */ - return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, FALSE); + return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, XT_XLOG_NO_WRITE_NO_FLUSH); } #ifdef XT_NOT_INLINE @@ -828,13 +847,23 @@ xtBool XTTableSeq::xt_op_is_before(register xtOpSeqNo now, register xtOpSeqNo th /* * Used by the writer to wake the freeer. */ -xtPublic void xt_wr_wake_freeer(XTThreadPtr self) +xtPublic void xt_wr_wake_freeer(XTThreadPtr self, XTDatabaseHPtr db) { + /* BUG FIX: Was using tcm_freeer_cond. + * This is incorrect. When the freeer waits for the + * writter, it uses the writer's condition! + */ + xt_lock_mutex_ns(&db->db_wr_lock); + if (!xt_broadcast_cond_ns(&db->db_wr_cond)) + xt_log_and_clear_exception_ns(); + xt_unlock_mutex_ns(&db->db_wr_lock); +/* xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock); pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock); if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond)) xt_log_and_clear_exception_ns(); freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock) +*/ } /* Wait for a transaction to quit: */ @@ -1070,7 +1099,9 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc) /* Free the page: */ size_t freed_space = offsetof(XTTabCachePageRec, tcp_data) + page->tcp_data_size; + ASSERT_NS(seg->tcs_cache_in_use >= freed_space); seg->tcs_cache_in_use -= freed_space; + ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000); xt_free_ns(page); TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id); @@ -1083,6 +1114,7 @@ static void tabc_fr_main(XTThreadPtr self) { register XTTabCacheMemPtr dcg = &xt_tab_cache; TCResourceRec tc = { 0 }; + int i; xt_set_low_priority(self); dcg->tcm_freeer_busy = TRUE; @@ -1095,14 +1127,20 @@ static void tabc_fr_main(XTThreadPtr self) while (!self->t_quit) { /* Total up the cache memory used: */ cache_used = 0; - for (int i=0; i<XT_TC_SEGMENT_COUNT; i++) + for (i=0; i<XT_TC_SEGMENT_COUNT; i++) cache_used += dcg->tcm_segment[i].tcs_cache_in_use; - if (cache_used > dcg->tcm_cache_high) { + + if (cache_used > dcg->tcm_cache_high) dcg->tcm_cache_high = cache_used; - } /* Check if the cache usage is over 95%: */ - if (self->t_quit || cache_used < dcg->tcm_high_level) + if (self->t_quit) + break; + + /* If threads are waiting then we are more aggressive about freeing + * cache. + */ + if (cache_used < (dcg->tcm_threads_waiting ? dcg->tcm_mid_level : dcg->tcm_high_level)) break; /* Reduce cache to the 75% level: */ @@ -1137,7 +1175,23 @@ static void tabc_fr_main(XTThreadPtr self) */ xt_db_approximate_time = time(NULL); dcg->tcm_freeer_busy = FALSE; - tabc_fr_wait_for_cache(self, 500); + /* No idea, why, but I am getting an uneccesarry pause here. + * I run DBT2 with low record cache. + * + * Every now and then there is a pause where the freeer is here, + * and all user threads are waiting for the freeer. + * + * So adding the tcm_threads_waiting condition. + */ + if (dcg->tcm_threads_waiting) { + cache_used = 0; + for (i=0; i<XT_TC_SEGMENT_COUNT; i++) + cache_used += dcg->tcm_segment[i].tcs_cache_in_use; + if (cache_used < dcg->tcm_mid_level) + tabc_fr_wait_for_cache(self, 500); + } + else + tabc_fr_wait_for_cache(self, 500); //tabc_fr_wait_for_cache(self, 30*1000); dcg->tcm_freeer_busy = TRUE; xt_db_approximate_time = time(NULL); @@ -1174,7 +1228,7 @@ static void *tabc_fr_run_thread(XTThreadPtr self) count = 2*60; #endif while (!self->t_quit && count > 0) { - xt_db_approximate_time = xt_trace_clock(); + xt_db_approximate_time = time(NULL); sleep(1); count--; } diff --git a/storage/pbxt/src/tabcache_xt.h b/storage/pbxt/src/tabcache_xt.h index fae8ad9be66..5dcd39050d4 100644 --- a/storage/pbxt/src/tabcache_xt.h +++ b/storage/pbxt/src/tabcache_xt.h @@ -29,6 +29,7 @@ struct XTTable; struct XTOpenTable; struct XTTabCache; +struct XTDatabase; #include "thread_xt.h" #include "filesys_xt.h" @@ -226,6 +227,7 @@ typedef struct XTTabCacheMem { size_t tcm_cache_high; /* The high water level of cache allocation. */ size_t tcm_low_level; /* This is the level to which the freeer will free, once it starts working. */ size_t tcm_high_level; /* This is the level at which the freeer will start to work (to avoid waiting)! */ + size_t tcm_mid_level; /* At this level the freeer will not sleep if there are threads waiting. */ /* The free'er thread: */ struct XTThread *tcm_freeer_thread; /* The freeer thread . */ @@ -283,6 +285,6 @@ void xt_check_table_cache(struct XTTable *tab); void xt_quit_freeer(XTThreadPtr self); void xt_stop_freeer(XTThreadPtr self); void xt_start_freeer(XTThreadPtr self); -void xt_wr_wake_freeer(XTThreadPtr self); +void xt_wr_wake_freeer(XTThreadPtr self, struct XTDatabase *db); #endif diff --git a/storage/pbxt/src/table_xt.cc b/storage/pbxt/src/table_xt.cc index 1aef77ef253..b01f4404ce3 100644 --- a/storage/pbxt/src/table_xt.cc +++ b/storage/pbxt/src/table_xt.cc @@ -61,6 +61,11 @@ #define CHECK_TABLE_STATS +/* The problem is that this can take a long time + * if the table is very large! + */ +//#define CHECK_TABLE_READ_DATA_LOG + #ifdef TRACE_TABLE_IDS //#define PRINTF xt_ftracef #define PRINTF xt_trace @@ -217,6 +222,7 @@ static void tab_finalize(XTThreadPtr self, void *x) xt_spinlock_free(self, &tab->tab_ainc_lock); xt_free_mutex(&tab->tab_rec_flush_lock); xt_free_mutex(&tab->tab_ind_flush_lock); + xt_free_mutex(&tab->tab_ind_stat_lock); xt_free_mutex(&tab->tab_dic_field_lock); xt_free_mutex(&tab->tab_row_lock); xt_free_mutex(&tab->tab_ind_lock); @@ -487,6 +493,12 @@ xtPublic void xt_tab_init_db(XTThreadPtr self, XTDatabaseHPtr db) } freer_(); // xt_describe_tables_exit(&desc) + /* + * When we open all tables, we ignore problems with foreign keys. + * This must be done or we will not be able to load tables that + * were created with foreign key checks off. + */ + self->st_ignore_fkeys = 1; /* * The purpose of this code is to ensure that all tables are opened and cached, * which is actually only required if tables have foreign key references. @@ -499,14 +511,30 @@ xtPublic void xt_tab_init_db(XTThreadPtr self, XTDatabaseHPtr db) * * Cannot open tables in the loop above because db->db_table_by_id which is built * above is used by xt_use_table_no_lock() + * + * {TABLE-STATS} + * NOTE: The code also lead to the statistics failing to work because + * the tables were already open when the handler was opened. + * Previously we only caclulated statistics when a handler was opened + * and the underlying table was also opened. */ xt_enum_tables_init(&edx); while ((te_ptr = xt_enum_tables_next(self, db, &edx))) { xt_strcpy(PATH_MAX, pbuf, te_ptr->te_tab_path->tp_path); xt_add_dir_char(PATH_MAX, pbuf); xt_strcat(PATH_MAX, pbuf, te_ptr->te_tab_name); - xt_heap_release(self, xt_use_table_no_lock(self, db, (XTPathStrPtr)pbuf, FALSE, FALSE, NULL, NULL)); + try_(a) { + xt_heap_release(self, xt_use_table_no_lock(self, db, (XTPathStrPtr) pbuf, FALSE, FALSE, NULL)); + } + catch_(a) { + /* ignore errors, because we are just loading all + * the tables that we can... + */ + xt_log_and_clear_warning(self); + } + cont_(a); } + self->st_ignore_fkeys = 0; popr_(); // Discard xt_tab_exit_db(db) exit_(); @@ -651,7 +679,7 @@ xtPublic void xt_check_tables(XTThreadPtr self) xt_strcpy(PATH_MAX, path, te_ptr->te_tab_path->tp_path); xt_add_dir_char(PATH_MAX, path); xt_strcat(PATH_MAX, path, te_ptr->te_tab_name); - tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL); + tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE); tab_check_table(self, tab); xt_heap_release(self, tab); tab = NULL; @@ -1009,6 +1037,7 @@ xtPublic xtBool xt_tab_write_min_auto_inc(XTOpenTablePtr ot) /* a helper function to remove table from the open tables hash on exception * used in tab_new_handle() below */ + #ifdef NO_LONGER_REQ static void xt_del_from_db_tables_ht(XTThreadPtr self, XTTableHPtr tab) { XTTableEntryPtr te_ptr; @@ -1024,6 +1053,7 @@ static void xt_del_from_db_tables_ht(XTThreadPtr self, XTTableHPtr tab) if ((te_ptr = (XTTableEntryPtr) xt_sl_find(self, db->db_table_by_id, &tab_id))) te_ptr->te_table = NULL; } +#endif /* * Create a new table handle (i.e. open a table). @@ -1067,6 +1097,7 @@ static int tab_new_handle(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr d xt_spinlock_init_with_autoname(self, &tab->tab_ainc_lock); xt_init_mutex_with_autoname(self, &tab->tab_rec_flush_lock); xt_init_mutex_with_autoname(self, &tab->tab_ind_flush_lock); + xt_init_mutex_with_autoname(self, &tab->tab_ind_stat_lock); xt_init_mutex_with_autoname(self, &tab->tab_dic_field_lock); xt_init_mutex_with_autoname(self, &tab->tab_row_lock); xt_init_mutex_with_autoname(self, &tab->tab_ind_lock); @@ -1148,9 +1179,16 @@ static int tab_new_handle(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr d * will work if we have cyclic foreign key references. */ if (tab->tab_dic.dic_table) { - pushr_(xt_del_from_db_tables_ht, tab); - tab->tab_dic.dic_table->attachReferences(self, db); - popr_(); + try_(a) { + tab->tab_dic.dic_table->attachReferences(self, db); + } + catch_(a) { + /* Errors are thrown when: set foreign_key_checks = 1 */ + /* Undo everything done above: */ + xt_ht_del(self, db->db_tables, tab->tab_name); + xt_throw(self); + } + cont_(a); } *r_tab = tab; @@ -1162,7 +1200,7 @@ static int tab_new_handle(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr d * Get a reference to a table in the current database. The table reference is valid, * as long as the thread is using the database!!! */ -xtPublic XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, XTDatabaseHPtr db, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, XTDictionaryPtr dic, xtBool *opened) +xtPublic XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, XTDatabaseHPtr db, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, XTDictionaryPtr dic) { XTTableHPtr tab; @@ -1181,9 +1219,6 @@ xtPublic XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, XTDatabaseHPtr db, X if (tab_new_handle(self, &tab, db, tab_id, name, FALSE, dic) == XT_TAB_NO_DICTIONARY) xt_throw_taberr(XT_CONTEXT, XT_ERR_NO_DICTIONARY, name); - - if (opened) - *opened = TRUE; } if (tab) @@ -1297,7 +1332,7 @@ xtPublic void xt_create_table(XTThreadPtr self, XTPathStrPtr name, XTDictionaryP XTSortedListInfoRec li_undo; #ifdef TRACE_CREATE_TABLES - fprintf(stderr, "CREATE %s\n", name->ps_path); + printf("CREATE %s\n", name->ps_path); #endif enter_(); if (strlen(xt_last_name_of_path(name->ps_path)) > XT_TABLE_NAME_SIZE-1) @@ -1528,7 +1563,7 @@ xtPublic void xt_create_table(XTThreadPtr self, XTPathStrPtr name, XTDictionaryP * as well. */ if (!old_tab_id) { - tab = xt_use_table_no_lock(self, db, name, FALSE, FALSE, NULL, NULL); + tab = xt_use_table_no_lock(self, db, name, FALSE, FALSE, NULL); xt_heap_release(self, tab); } } @@ -1556,7 +1591,7 @@ xtPublic void xt_create_table(XTThreadPtr self, XTPathStrPtr name, XTDictionaryP /* Same purpose as above {LOAD-FOR-FKS} (although this should work, * beacuse this is a TRUNCATE TABLE. */ - tab = xt_use_table_no_lock(self, db, name, FALSE, FALSE, NULL, NULL); + tab = xt_use_table_no_lock(self, db, name, FALSE, FALSE, NULL); xt_heap_release(self, tab); } catch_(b) { @@ -1599,7 +1634,7 @@ xtPublic void xt_create_table(XTThreadPtr self, XTPathStrPtr name, XTDictionaryP /* this code is not needed anymore as we open tables referred by FKs as necessary during checks xt_ht_lock(self, db->db_tables); pushr_(xt_ht_unlock, db->db_tables); - tab = xt_use_table_no_lock(self, db, name, FALSE, FALSE, NULL, NULL); + tab = xt_use_table_no_lock(self, db, name, FALSE, FALSE, NULL); freer_(); // xt_ht_unlock(db->db_tables) xt_heap_release(self, tab); * CHANGED see {LOAD-FOR-FKS} above. @@ -1619,7 +1654,7 @@ xtPublic void xt_drop_table(XTThreadPtr self, XTPathStrPtr tab_name, xtBool drop enter_(); #ifdef TRACE_CREATE_TABLES - fprintf(stderr, "DROP %s\n", tab_name->ps_path); + printf("DROP %s\n", tab_name->ps_path); #endif table_pool = tab_lock_table(self, tab_name, FALSE, TRUE, TRUE, &tab); @@ -1758,10 +1793,12 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) XTTableHPtr tab = ot->ot_table; xtRecordID prec_id; XTTabRecExtDPtr rec_buf = (XTTabRecExtDPtr) ot->ot_row_rbuffer; +#ifdef CHECK_TABLE_READ_DATA_LOG XTactExtRecEntryDRec ext_rec; size_t log_size; xtLogID log_id; xtLogOffset log_offset; +#endif xtRecordID rec_id; xtRecordID prev_rec_id; xtXactID xn_id; @@ -1775,9 +1812,10 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) size_t rec_size; size_t row_size; u_llong ext_data_len = 0; + u_llong ext_rec_count = 0; #if defined(DUMP_CHECK_TABLE) || defined(CHECK_TABLE_STATS) - fprintf(stderr, "\nCHECK TABLE: %s\n", tab->tab_name->ps_path); + printf("\nCHECK TABLE: %s\n", tab->tab_name->ps_path); #endif xt_lock_mutex(self, &tab->tab_db->db_co_ext_lock); @@ -1787,38 +1825,38 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) pushr_(xt_unlock_mutex, &tab->tab_rec_lock); #ifdef CHECK_TABLE_STATS - fprintf(stderr, "Record buffer size = %lu\n", (u_long) tab->tab_dic.dic_mysql_buf_size); - fprintf(stderr, "Fixed length rec. len. = %lu\n", (u_long) tab->tab_dic.dic_mysql_rec_size); - fprintf(stderr, "Handle data record size = %lu\n", (u_long) tab->tab_dic.dic_rec_size); - fprintf(stderr, "Min/max header size = %d/%d\n", (int) offsetof(XTTabRecFix, rf_data), tab->tab_dic.dic_rec_fixed ? (int) offsetof(XTTabRecFix, rf_data) : (int) offsetof(XTTabRecExtDRec, re_data)); - fprintf(stderr, "Min/avg/max record size = %llu/%llu/%llu\n", (u_llong) tab->tab_dic.dic_min_row_size, (u_llong) tab->tab_dic.dic_ave_row_size, (u_llong) tab->tab_dic.dic_max_row_size); + printf("Record buffer size = %lu\n", (u_long) tab->tab_dic.dic_mysql_buf_size); + printf("Fixed length rec. len. = %lu\n", (u_long) tab->tab_dic.dic_mysql_rec_size); + printf("Handle data record size = %lu\n", (u_long) tab->tab_dic.dic_rec_size); + printf("Min/max header size = %d/%d\n", (int) offsetof(XTTabRecFix, rf_data), tab->tab_dic.dic_rec_fixed ? (int) offsetof(XTTabRecFix, rf_data) : (int) offsetof(XTTabRecExtDRec, re_data)); + printf("Min/avg/max record size = %llu/%llu/%llu\n", (u_llong) tab->tab_dic.dic_min_row_size, (u_llong) tab->tab_dic.dic_ave_row_size, (u_llong) tab->tab_dic.dic_max_row_size); if (tab->tab_dic.dic_def_ave_row_size) - fprintf(stderr, "Avg row len set for tab = %lu\n", (u_long) tab->tab_dic.dic_def_ave_row_size); + printf("Avg row len set for tab = %lu\n", (u_long) tab->tab_dic.dic_def_ave_row_size); else - fprintf(stderr, "Avg row len set for tab = not specified\n"); - fprintf(stderr, "Rows fixed length = %s\n", tab->tab_dic.dic_rec_fixed ? "YES" : "NO"); + printf("Avg row len set for tab = not specified\n"); + printf("Rows fixed length = %s\n", tab->tab_dic.dic_rec_fixed ? "YES" : "NO"); if (tab->tab_dic.dic_tab_flags & XT_TAB_FLAGS_TEMP_TAB) - fprintf(stderr, "Table type = TEMP\n"); + printf("Table type = TEMP\n"); if (tab->tab_dic.dic_def_ave_row_size) - fprintf(stderr, "Maximum fixed size = %lu\n", (u_long) XT_TAB_MAX_FIX_REC_LENGTH_SPEC); + printf("Maximum fixed size = %lu\n", (u_long) XT_TAB_MAX_FIX_REC_LENGTH_SPEC); else - fprintf(stderr, "Maximum fixed size = %lu\n", (u_long) XT_TAB_MAX_FIX_REC_LENGTH); - fprintf(stderr, "Minimum variable size = %lu\n", (u_long) XT_TAB_MIN_VAR_REC_LENGTH); - fprintf(stderr, "Minimum auto-increment = %llu\n", (u_llong) tab->tab_dic.dic_min_auto_inc); - fprintf(stderr, "Number of columns = %lu\n", (u_long) tab->tab_dic.dic_no_of_cols); - fprintf(stderr, "Number of fixed columns = %lu\n", (u_long) tab->tab_dic.dic_fix_col_count); - fprintf(stderr, "Columns req. for index = %lu\n", (u_long) tab->tab_dic.dic_ind_cols_req); + printf("Maximum fixed size = %lu\n", (u_long) XT_TAB_MAX_FIX_REC_LENGTH); + printf("Minimum variable size = %lu\n", (u_long) XT_TAB_MIN_VAR_REC_LENGTH); + printf("Minimum auto-increment = %llu\n", (u_llong) tab->tab_dic.dic_min_auto_inc); + printf("Number of columns = %lu\n", (u_long) tab->tab_dic.dic_no_of_cols); + printf("Number of fixed columns = %lu\n", (u_long) tab->tab_dic.dic_fix_col_count); + printf("Columns req. for index = %lu\n", (u_long) tab->tab_dic.dic_ind_cols_req); if (tab->tab_dic.dic_ind_rec_len) - fprintf(stderr, "Rec len req. for index = %llu\n", (u_llong) tab->tab_dic.dic_ind_rec_len); - fprintf(stderr, "Columns req. for blobs = %lu\n", (u_long) tab->tab_dic.dic_blob_cols_req); - fprintf(stderr, "Number of blob columns = %lu\n", (u_long) tab->tab_dic.dic_blob_count); - fprintf(stderr, "Number of indices = %lu\n", (u_long) tab->tab_dic.dic_key_count); + printf("Rec len req. for index = %llu\n", (u_llong) tab->tab_dic.dic_ind_rec_len); + printf("Columns req. for blobs = %lu\n", (u_long) tab->tab_dic.dic_blob_cols_req); + printf("Number of blob columns = %lu\n", (u_long) tab->tab_dic.dic_blob_count); + printf("Number of indices = %lu\n", (u_long) tab->tab_dic.dic_key_count); #endif #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "Records:-\n"); - fprintf(stderr, "Free list: %llu (%llu)\n", (u_llong) tab->tab_rec_free_id, (u_llong) tab->tab_rec_fnum); - fprintf(stderr, "EOF: %llu\n", (u_llong) tab->tab_rec_eof_id); + printf("Records:-\n"); + printf("Free list: %llu (%llu)\n", (u_llong) tab->tab_rec_free_id, (u_llong) tab->tab_rec_fnum); + printf("EOF: %llu\n", (u_llong) tab->tab_rec_eof_id); #endif rec_size = XT_REC_EXT_HEADER_SIZE; @@ -1830,24 +1868,24 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) xt_throw(self); #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "%-4llu ", (u_llong) rec_id); + printf("%-4llu ", (u_llong) rec_id); #endif switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) { case XT_TAB_STATUS_FREED: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "======== "); + printf("======== "); #endif free_rec_count++; break; case XT_TAB_STATUS_DELETE: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "delete "); + printf("delete "); #endif delete_rec_count++; break; case XT_TAB_STATUS_FIXED: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "record-F "); + printf("record-F "); #endif alloc_rec_count++; row_size = myxt_store_row_length(ot, (char *) ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE); @@ -1859,7 +1897,7 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) break; case XT_TAB_STATUS_VARIABLE: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "record-V "); + printf("record-V "); #endif alloc_rec_count++; row_size = myxt_load_row_length(ot, tab->tab_dic.dic_rec_size, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, NULL); @@ -1871,9 +1909,10 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) break; case XT_TAB_STATUS_EXT_DLOG: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "record-X "); + printf("record-X "); #endif alloc_rec_count++; + ext_rec_count++; ext_data_len += XT_GET_DISK_4(rec_buf->re_log_dat_siz_4); row_size = XT_GET_DISK_4(rec_buf->re_log_dat_siz_4) + ot->ot_rec_size - XT_REC_EXT_HEADER_SIZE; alloc_rec_bytes += row_size; @@ -1885,9 +1924,9 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) } #ifdef DUMP_CHECK_TABLE if (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_CLEANED_BIT) - fprintf(stderr, "C"); + printf("C"); else - fprintf(stderr, " "); + printf(" "); #endif prev_rec_id = XT_GET_DISK_4(rec_buf->tr_prev_rec_id_4); xn_id = XT_GET_DISK_4(rec_buf->tr_xact_id_4); @@ -1895,14 +1934,15 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) switch (rec_buf->tr_rec_type_1 & XT_TAB_STATUS_MASK) { case XT_TAB_STATUS_FREED: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, " prev=%-3llu (xact=%-3llu row=%lu)\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id); + printf(" prev=%-3llu (xact=%-3llu row=%lu)\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id); #endif break; case XT_TAB_STATUS_EXT_DLOG: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, " prev=%-3llu xact=%-3llu row=%lu Xlog=%lu Xoff=%llu Xsiz=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id, (u_long) XT_GET_DISK_2(rec_buf->re_log_id_2), (u_llong) XT_GET_DISK_6(rec_buf->re_log_offs_6), (u_long) XT_GET_DISK_4(rec_buf->re_log_dat_siz_4)); + printf(" prev=%-3llu xact=%-3llu row=%lu Xlog=%lu Xoff=%llu Xsiz=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id, (u_long) XT_GET_DISK_2(rec_buf->re_log_id_2), (u_llong) XT_GET_DISK_6(rec_buf->re_log_offs_6), (u_long) XT_GET_DISK_4(rec_buf->re_log_dat_siz_4)); #endif +#ifdef CHECK_TABLE_READ_DATA_LOG log_size = XT_GET_DISK_4(rec_buf->re_log_dat_siz_4); XT_GET_LOG_REF(log_id, log_offset, rec_buf); if (!self->st_dlog_buf.dlb_read_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data), (xtWord1 *) &ext_rec, self)) @@ -1919,10 +1959,11 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) xt_logf(XT_INFO, "Table %s: record %llu, extended record %lu:%llu not valid\n", tab->tab_name, (u_llong) rec_id, (u_long) log_id, (u_llong) log_offset); } } +#endif break; default: #ifdef DUMP_CHECK_TABLE - fprintf(stderr, " prev=%-3llu xact=%-3llu row=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id); + printf(" prev=%-3llu xact=%-3llu row=%lu\n", (u_llong) prev_rec_id, (u_llong) xn_id, (u_long) row_id); #endif break; } @@ -1930,17 +1971,19 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) } #ifdef CHECK_TABLE_STATS - if (!tab->tab_dic.dic_rec_fixed) - fprintf(stderr, "Extendend data length = %llu\n", ext_data_len); + if (!tab->tab_dic.dic_rec_fixed) { + printf("Extended data length = %llu\n", ext_data_len); + printf("Extended record count = %llu\n", ext_rec_count); + } if (alloc_rec_count) { - fprintf(stderr, "Minumum comp. rec. len. = %llu\n", (u_llong) min_comp_rec_len); - fprintf(stderr, "Average comp. rec. len. = %llu\n", (u_llong) ((double) alloc_rec_bytes / (double) alloc_rec_count + (double) 0.5)); - fprintf(stderr, "Maximum comp. rec. len. = %llu\n", (u_llong) max_comp_rec_len); + printf("Minumum comp. rec. len. = %llu\n", (u_llong) min_comp_rec_len); + printf("Average comp. rec. len. = %llu\n", (u_llong) ((double) alloc_rec_bytes / (double) alloc_rec_count + (double) 0.5)); + printf("Maximum comp. rec. len. = %llu\n", (u_llong) max_comp_rec_len); } - fprintf(stderr, "Free record count = %llu\n", (u_llong) free_rec_count); - fprintf(stderr, "Deleted record count = %llu\n", (u_llong) delete_rec_count); - fprintf(stderr, "Allocated record count = %llu\n", (u_llong) alloc_rec_count); + printf("Free record count = %llu\n", (u_llong) free_rec_count); + printf("Deleted record count = %llu\n", (u_llong) delete_rec_count); + printf("Allocated record count = %llu\n", (u_llong) alloc_rec_count); #endif if (tab->tab_rec_fnum != free_rec_count) xt_logf(XT_INFO, "Table %s: incorrect number of free blocks, %llu, should be: %llu\n", tab->tab_name, (u_llong) free_rec_count, (u_llong) tab->tab_rec_fnum); @@ -1978,9 +2021,9 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) pushr_(xt_unlock_mutex, &tab->tab_row_lock); #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "Rows:-\n"); - fprintf(stderr, "Free list: %llu (%llu)\n", (u_llong) tab->tab_row_free_id, (u_llong) tab->tab_row_fnum); - fprintf(stderr, "EOF: %llu\n", (u_llong) tab->tab_row_eof_id); + printf("Rows:-\n"); + printf("Free list: %llu (%llu)\n", (u_llong) tab->tab_row_free_id, (u_llong) tab->tab_row_fnum); + printf("EOF: %llu\n", (u_llong) tab->tab_row_eof_id); #endif rec_id = 1; @@ -1988,13 +2031,13 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot) if (!tab->tab_rows.xt_tc_read_4(ot->ot_row_file, rec_id, &ref_id, self)) xt_throw(self); #ifdef DUMP_CHECK_TABLE - fprintf(stderr, "%-3llu ", (u_llong) rec_id); + printf("%-3llu ", (u_llong) rec_id); #endif #ifdef DUMP_CHECK_TABLE if (ref_id == 0) - fprintf(stderr, "====== 0\n"); + printf("====== 0\n"); else - fprintf(stderr, "in use %llu\n", (u_llong) ref_id); + printf("in use %llu\n", (u_llong) ref_id); #endif rec_id++; } @@ -2026,7 +2069,7 @@ xtPublic void xt_rename_table(XTThreadPtr self, XTPathStrPtr old_name, XTPathStr memset(&dic, 0, sizeof(dic)); #ifdef TRACE_CREATE_TABLES - fprintf(stderr, "RENAME %s --> %s\n", old_name->ps_path, new_name->ps_path); + printf("RENAME %s --> %s\n", old_name->ps_path, new_name->ps_path); #endif if (strlen(xt_last_name_of_path(new_name->ps_path)) > XT_TABLE_NAME_SIZE-1) xt_throw_taberr(XT_CONTEXT, XT_ERR_NAME_TOO_LONG, new_name); @@ -2100,7 +2143,7 @@ xtPublic void xt_rename_table(XTThreadPtr self, XTPathStrPtr old_name, XTPathStr popr_(); // Discard tab_free_table_path(te_new_path); popr_(); // Discard xt_free(te_new_name); - tab = xt_use_table_no_lock(self, db, new_name, FALSE, FALSE, &dic, NULL); + tab = xt_use_table_no_lock(self, db, new_name, FALSE, FALSE, &dic); /* All renamed tables are considered repaired! */ xt_tab_table_repaired(tab); xt_heap_release(self, tab); @@ -2110,14 +2153,14 @@ xtPublic void xt_rename_table(XTThreadPtr self, XTPathStrPtr old_name, XTPathStr freer_(); // xt_db_unlock_table_pool(table_pool) } -xtPublic XTTableHPtr xt_use_table(XTThreadPtr self, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, xtBool *opened) +xtPublic XTTableHPtr xt_use_table(XTThreadPtr self, XTPathStrPtr name, xtBool no_load, xtBool missing_ok) { XTTableHPtr tab; XTDatabaseHPtr db = self->st_database; xt_ht_lock(self, db->db_tables); pushr_(xt_ht_unlock, db->db_tables); - tab = xt_use_table_no_lock(self, db, name, no_load, missing_ok, NULL, opened); + tab = xt_use_table_no_lock(self, db, name, no_load, missing_ok, NULL); freer_(); return tab; } @@ -2221,7 +2264,7 @@ xtPublic xtBool xt_flush_record_row(XTOpenTablePtr ot, off_t *bytes_flushed, xtB xt_tab_store_header(ot, &rec_head); #ifdef TRACE_FLUSH - fprintf(stderr, "FLUSH rec/row %d %s\n", (int) tab->tab_bytes_to_flush, tab->tab_name->ps_path); + printf("FLUSH rec/row %d %s\n", (int) tab->tab_bytes_to_flush, tab->tab_name->ps_path); fflush(stdout); #endif /* Write the table header: */ @@ -2276,7 +2319,7 @@ xtPublic xtBool xt_flush_record_row(XTOpenTablePtr ot, off_t *bytes_flushed, xtB xt_unlock_mutex_ns(&cp->cp_state_lock); #ifdef TRACE_FLUSH - fprintf(stderr, "FLUSH --end-- %s\n", tab->tab_name->ps_path); + printf("FLUSH --end-- %s\n", tab->tab_name->ps_path); fflush(stdout); #endif xt_unlock_mutex_ns(&tab->tab_rec_flush_lock); @@ -2858,7 +2901,7 @@ static int tab_visible(register XTOpenTablePtr ot, XTTabRecHeadDPtr rec_head, xt /* This is a record written by this transaction. */ if (thread->st_is_update) { /* Check that it was not written by the current update statement: */ - if (XT_STAT_ID_MASK(thread->st_update_id) == rec_head->tr_stat_id_1) { + if (XT_STAT_ID_MASK(ot->ot_update_id) == rec_head->tr_stat_id_1) { #ifdef TRACE_VARIATIONS if (len <= 450) len += sprintf(t_buf+len, "MY UPDATE IN THIS STATEMENT T%d\n", (int) xn_id); @@ -3139,7 +3182,7 @@ static int tab_visible(register XTOpenTablePtr ot, XTTabRecHeadDPtr rec_head, xt } if (lw.lw_curr_lock != XT_NO_LOCK) { #ifdef TRACE_VARIATIONS - xt_ttracef(thread, "T%d WAIT FOR LOCK(%D) T%d\n", (int) thread->st_xact_data->xd_start_xn_id, (int) lock_type, (int) xn_id); + xt_ttracef(thread, "T%d WAIT FOR LOCK(%s) T%d\n", (int) thread->st_xact_data->xd_start_xn_id, (int) lw.lw_curr_lock == XT_TEMP_LOCK ? "temp" : "perm", (int) xn_id); #endif if (!xt_xn_wait_for_xact(thread, NULL, &lw)) { #ifdef DEBUG_LOCK_QUEUE @@ -3950,6 +3993,21 @@ static void tab_free_row_on_fail(XTOpenTablePtr ot, XTTableHPtr tab, xtRowID row tab_restore_exception(&e); } +static xtBool tab_write_ext_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, xtTableID tab_id, xtRecordID rec_id, xtLogID log_id, xtLogOffset log_offset, XTThreadPtr thread) +{ + xtWord1 tmp_buffer[offsetof(XTactExtRecEntryDRec, er_data)]; + xtBool ok; + + memcpy(tmp_buffer, rec_info->ri_log_buf, sizeof(tmp_buffer)); + rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK; + XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size); + XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab_id); + XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id); + ok = thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf, thread); + memcpy(rec_info->ri_log_buf, tmp_buffer, sizeof(tmp_buffer)); + return ok; +} + static xtBool tab_add_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, u_int status) { register XTTableHPtr tab = ot->ot_table; @@ -4007,26 +4065,19 @@ static xtBool tab_add_record(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, u_int */ read = ((rec_id - 1) % tab->tab_recs.tci_rows_per_page) != 0; - if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, &op_seq, read, ot->ot_thread)) { + if (!tab->tab_recs.xt_tc_write(ot->ot_rec_file, rec_id, 0, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, &op_seq, read, thread)) { xt_unlock_mutex_ns(&tab->tab_rec_lock); return FAILED; } } xt_unlock_mutex_ns(&tab->tab_rec_lock); - if (!xt_xlog_modify_table(tab->tab_id, status, op_seq, next_rec_id, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, ot->ot_thread)) + if (!xt_xlog_modify_table(tab->tab_id, status, op_seq, next_rec_id, rec_id, rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, thread)) return FAILED; if (rec_info->ri_ext_rec) { - /* Write the log buffer overflow: */ - rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK; - XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size); - XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab->tab_id); - XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id); - if (!thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf, ot->ot_thread)) { - /* Failed to write the overflow, free the record allocated above: */ + if (!tab_write_ext_record(ot, rec_info, tab->tab_id, rec_id, log_id, log_offset, thread)) return FAILED; - } } XT_DISABLED_TRACE(("new rec tx=%d val=%d\n", (int) thread->st_xact_data->xd_start_xn_id, (int) rec_id)); @@ -4062,8 +4113,11 @@ static void tab_delete_record_on_fail(XTOpenTablePtr ot, xtRowID row_id, xtRecor } } + /* This is not required because the extended record will be free + * later when the record is freed! if (row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_DLOG || row_ptr->tr_rec_type_1 == XT_TAB_STATUS_EXT_CLEAN) tab_free_ext_record_on_fail(ot, rec_id, (XTTabRecExtDPtr) row_ptr, log_err); + */ rec_info.ri_fix_rec_buf = (XTTabRecFixDPtr) ot->ot_row_wbuffer; rec_info.ri_rec_buf_size = offsetof(XTTabRecFixDRec, rf_data); @@ -4369,7 +4423,7 @@ xtPublic xtBool xt_tab_new_record(XTOpenTablePtr ot, xtWord1 *rec_buf) if (!(row_id = tab_new_row(ot, tab))) goto failed_0; - rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id; + rec_info.ri_fix_rec_buf->tr_stat_id_1 = ot->ot_update_id; XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id); XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, 0); XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id); @@ -4458,7 +4512,7 @@ static xtBool tab_overwrite_record_on_fail(XTOpenTablePtr ot, XTTabRecInfoPtr re * order to do this we'd need to read the before-image of the * record before modifying it. */ - if (!ot->ot_thread->t_exception.e_xt_err) + if (!thread->t_exception.e_xt_err) xt_register_xterr(XT_REG_CONTEXT, XT_ERR_NO_BEFORE_IMAGE); return FAILED; } @@ -4471,7 +4525,7 @@ static xtBool tab_overwrite_record_on_fail(XTOpenTablePtr ot, XTTabRecInfoPtr re if (rec_info->ri_ext_rec) { /* Determine where the overflow will go... */ - if (!thread->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, rec_info->ri_log_data_size + offsetof(XTactExtRecEntryDRec, er_data), ot->ot_thread)) + if (!thread->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, rec_info->ri_log_data_size + offsetof(XTactExtRecEntryDRec, er_data), thread)) return FAILED; XT_SET_LOG_REF(rec_info->ri_ext_rec, log_id, log_offset); } @@ -4481,11 +4535,7 @@ static xtBool tab_overwrite_record_on_fail(XTOpenTablePtr ot, XTTabRecInfoPtr re if (rec_info->ri_ext_rec) { /* Write the log buffer overflow: */ - rec_info->ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK; - XT_SET_DISK_4(rec_info->ri_log_buf->er_data_size_4, rec_info->ri_log_data_size); - XT_SET_DISK_4(rec_info->ri_log_buf->er_tab_id_4, tab->tab_id); - XT_SET_DISK_4(rec_info->ri_log_buf->er_rec_id_4, rec_id); - if (!thread->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info->ri_log_data_size, (xtWord1 *) rec_info->ri_log_buf, ot->ot_thread)) + if (!tab_write_ext_record(ot, rec_info, tab->tab_id, rec_id, log_id, log_offset, thread)) return FAILED; } @@ -4529,12 +4579,12 @@ static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWor if (rec_info.ri_ext_rec) { /* Determine where the overflow will go... */ - if (!self->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size, ot->ot_thread)) + if (!self->st_dlog_buf.dlb_get_log_offset(&log_id, &log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size, self)) goto failed_0; XT_SET_LOG_REF(rec_info.ri_ext_rec, log_id, log_offset); } - rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id; + rec_info.ri_fix_rec_buf->tr_stat_id_1 = ot->ot_update_id; XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id); XT_COPY_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, prev_rec_head.tr_prev_rec_id_4); XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id); @@ -4555,11 +4605,7 @@ static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWor if (rec_info.ri_ext_rec) { /* Write the log buffer overflow: */ - rec_info.ri_log_buf->er_status_1 = XT_LOG_ENT_EXT_REC_OK; - XT_SET_DISK_4(rec_info.ri_log_buf->er_data_size_4, rec_info.ri_log_data_size); - XT_SET_DISK_4(rec_info.ri_log_buf->er_tab_id_4, tab->tab_id); - XT_SET_DISK_4(rec_info.ri_log_buf->er_rec_id_4, rec_id); - if (!self->st_dlog_buf.dlb_append_log(log_id, log_offset, offsetof(XTactExtRecEntryDRec, er_data) + rec_info.ri_log_data_size, (xtWord1 *) rec_info.ri_log_buf, ot->ot_thread)) + if (!tab_write_ext_record(ot, &rec_info, tab->tab_id, rec_id, log_id, log_offset, self)) goto failed_1; } @@ -4678,7 +4724,7 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW if (!myxt_store_row(ot, &rec_info, (char *) after_buf)) goto failed_0; - rec_info.ri_fix_rec_buf->tr_stat_id_1 = self->st_update_id; + rec_info.ri_fix_rec_buf->tr_stat_id_1 = ot->ot_update_id; XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_row_id_4, row_id); XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_prev_rec_id_4, ot->ot_curr_rec_id); XT_SET_DISK_4(rec_info.ri_fix_rec_buf->tr_xact_id_4, self->st_xact_data->xd_start_xn_id); @@ -4688,7 +4734,7 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW goto failed_0; /* Link the new variation into the list: */ - XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], ot->ot_thread); + XT_TAB_ROW_WRITE_LOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self); if (!xt_tab_get_row(ot, row_id, &curr_var_rec_id)) goto failed_1; @@ -4714,7 +4760,7 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW goto failed_1; XT_DISABLED_TRACE(("set upd tx=%d row=%d rec=%d\n", (int) self->st_xact_data->xd_start_xn_id, (int) row_id, (int) rec_info.ri_rec_id)); - XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], ot->ot_thread); + XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self); /* Add the index references: */ for (idx_cnt=0, ind=tab->tab_dic.dic_keys; idx_cnt<tab->tab_dic.dic_key_count; idx_cnt++, ind++) { @@ -4729,7 +4775,7 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW goto failed_2; } - ot->ot_thread->st_statistics.st_row_update++; + self->st_statistics.st_row_update++; return OK; failed_2: @@ -4737,7 +4783,7 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW goto failed_0; failed_1: - XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], ot->ot_thread); + XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], self); failed_0: return FAILED; diff --git a/storage/pbxt/src/table_xt.h b/storage/pbxt/src/table_xt.h index f7d7e1c6526..83f2168dd6e 100644 --- a/storage/pbxt/src/table_xt.h +++ b/storage/pbxt/src/table_xt.h @@ -364,6 +364,9 @@ typedef struct XTTable : public XTHeap { xtWord4 tab_rec_fnum; /* The count of the number of free rows on the free list. */ xt_mutex_type tab_rec_lock; /* Lock for the free list. */ + xt_mutex_type tab_ind_stat_lock; /* Aquired when calculating index statistics. */ + time_t tab_ind_stat_calc_time; /* Zero means the index stats have not be calculated, otherwize this is a time. */ + xt_mutex_type tab_ind_flush_lock; /* Required while the index file is being flushed. */ xtLogID tab_ind_rec_log_id; /* The point before which index entries have been written. */ xtLogOffset tab_ind_rec_log_offset; /* The log offset of the write point. */ @@ -372,7 +375,7 @@ typedef struct XTTable : public XTHeap { xtIndexNodeID tab_ind_free; /* The start of the free page list of the index. */ XTIndFreeListPtr tab_ind_free_list; /* A cache of the free list (if exists, don't go to disk!) */ xt_mutex_type tab_ind_lock; /* Lock for reading and writing the index free list. */ - xtWord2 tab_ind_flush_seq; + xtWord4 tab_ind_flush_seq; } XTTableHRec, *XTTableHPtr; /* Heap pointer */ /* Used for an in-memory list of the tables, ordered by ID. */ @@ -403,6 +406,8 @@ typedef struct XTOpenTable { size_t ot_rec_size; /* Cached from table for quick access. */ char ot_error_key[XT_IDENTIFIER_NAME_SIZE]; + struct XTOpenTable *ot_prev_update; /* The UPDATE statement stack! {UPDATE-STACK} */ + u_int ot_update_id; /* The update statement ID. */ xtBool ot_for_update; /* True if reading FOR UPDATE. */ xtBool ot_is_modify; /* True if UPDATE or DELETE. */ xtRowID ot_temp_row_lock; /* The temporary row lock set on this table. */ @@ -507,11 +512,11 @@ void xt_check_tables(struct XTThread *self); char *xt_tab_file_to_name(size_t size, char *tab_name, char *file_name); void xt_create_table(struct XTThread *self, XTPathStrPtr name, XTDictionaryPtr dic); -XTTableHPtr xt_use_table(struct XTThread *self, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, xtBool *opened); +XTTableHPtr xt_use_table(struct XTThread *self, XTPathStrPtr name, xtBool no_load, xtBool missing_ok); void xt_sync_flush_table(struct XTThread *self, XTOpenTablePtr ot); xtBool xt_flush_record_row(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool have_table_loc); void xt_flush_table(struct XTThread *self, XTOpenTablePtr ot); -XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, struct XTDatabase *db, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, XTDictionaryPtr dic, xtBool *opened); +XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, struct XTDatabase *db, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, XTDictionaryPtr dic); int xt_use_table_by_id(struct XTThread *self, XTTableHPtr *tab, struct XTDatabase *db, xtTableID tab_id); XTOpenTablePtr xt_open_table(XTTableHPtr tab); void xt_close_table(XTOpenTablePtr ot, xtBool flush, xtBool have_table_lock); diff --git a/storage/pbxt/src/thread_xt.cc b/storage/pbxt/src/thread_xt.cc index c31e23e69df..ac42896d22f 100644 --- a/storage/pbxt/src/thread_xt.cc +++ b/storage/pbxt/src/thread_xt.cc @@ -96,7 +96,7 @@ xtPublic xtBool xt_init_logging(void) { int err; - log_file = stderr; + log_file = stdout; log_level = XT_LOG_TRACE; err = xt_p_mutex_init_with_autoname(&log_mutex, NULL); if (err) { @@ -413,7 +413,8 @@ static void thr_save_error_va(XTExceptionPtr e, XTThreadPtr self, xtBool throw_i vsnprintf(e->e_err_msg, XT_ERR_MSG_SIZE, fmt, ap); /* Make the first character of the message upper case: */ - if (isalpha(e->e_err_msg[0]) && islower(e->e_err_msg[0])) + /* This did not work for foreign languages! */ + if (e->e_err_msg[0] >= 'a' && e->e_err_msg[0] <= 'z') e->e_err_msg[0] = (char) toupper(e->e_err_msg[0]); if (func && *func && *func != '-') @@ -793,11 +794,9 @@ xtPublic void xt_register_tabcolerr(c_char *func, c_char *file, u_int line, int xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path); xt_strcat(sizeof(buffer), buffer, "."); - xt_strcpy(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); - xt_strcat(sizeof(buffer), buffer, "."); - xt_strcat(sizeof(buffer), buffer, item2); + xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); - xt_register_ixterr(func, file, line, xt_err, buffer); + xt_register_i2xterr(func, file, line, xt_err, buffer, item2); } xtPublic void xt_register_taberr(c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item) @@ -806,7 +805,7 @@ xtPublic void xt_register_taberr(c_char *func, c_char *file, u_int line, int xt_ xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path); xt_strcat(sizeof(buffer), buffer, "."); - xt_strcpy(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); + xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path)); xt_register_ixterr(func, file, line, xt_err, buffer); } @@ -1013,7 +1012,7 @@ static xtBool thr_setup_signals(void) typedef void *(*ThreadMainFunc)(XTThreadPtr self); -extern "C" void *thr_main_pbxt(void *data) +extern "C" void *xt_thread_main(void *data) { ThreadDataPtr td = (ThreadDataPtr) data; XTThreadPtr self = td->td_thr; @@ -1168,6 +1167,14 @@ xtPublic XTThreadPtr xt_init_threading(u_int max_threads) /* Align the number of threads: */ xt_thr_maximum_threads = xt_align_size(max_threads, XT_XS_LOCK_ALIGN); +#ifdef XT_TRACK_CONNECTIONS + if (xt_thr_maximum_threads > XT_TRACK_MAX_CONNS) { + xt_log_error(XT_NS_CONTEXT, XT_LOG_FATAL, XT_ERR_TOO_MANY_THREADS, 0, + "XT_TRACK_CONNECTIONS is enabled and xt_thr_maximum_threads > XT_TRACK_MAX_CONNS"); + goto failed; + } +#endif + #ifdef HANDLE_SIGNALS if (!thr_setup_signals()) return NULL; @@ -1503,10 +1510,10 @@ xtPublic pthread_t xt_run_thread(XTThreadPtr self, XTThreadPtr child, void *(*st pthread_attr_t attr = { 0, 0, 0 }; attr.priority = THREAD_PRIORITY_NORMAL; - err = pthread_create(&child_thread, &attr, thr_main_pbxt, &data); + err = pthread_create(&child_thread, &attr, xt_thread_main, &data); } #else - err = pthread_create(&child_thread, NULL, thr_main_pbxt, &data); + err = pthread_create(&child_thread, NULL, xt_thread_main, &data); #endif if (err) { xt_free_thread(child); diff --git a/storage/pbxt/src/thread_xt.h b/storage/pbxt/src/thread_xt.h index 7fcf6105a59..a07f7b7ae01 100644 --- a/storage/pbxt/src/thread_xt.h +++ b/storage/pbxt/src/thread_xt.h @@ -132,6 +132,7 @@ struct XTSortedList; struct XTXactLog; struct XTXactData; struct XTDatabase; +struct XTOpenTable; typedef void (*XTThreadFreeFunc)(struct XTThread *self, void *data); @@ -307,8 +308,7 @@ typedef struct XTThread { xtThreadID *st_thread_list; /* Used to prevent a record from being updated twice in one statement. */ - xtBool st_is_update; /* TRUE if this is an UPDATE statement. */ - u_int st_update_id; /* The update statement ID. */ + struct XTOpenTable *st_is_update; /* TRUE if this is an UPDATE statement. {UPDATE-STACK} */ XTRowLockListRec st_lock_list; /* The thread row lock list (drop locks on transaction end). */ XTStatisticsRec st_statistics; /* Accumulated statistics for this thread. */ @@ -536,7 +536,10 @@ extern struct XTThread **xt_thr_array; * Function prototypes */ -extern "C" void *thr_main_pbxt(void *data); +/* OpenSolaris has thr_main in /usr/include/thread.h (name conflict) + * Thanks for the tip Monty! + */ +extern "C" void *xt_thread_main(void *data); void xt_get_now(char *buffer, size_t len); xtBool xt_init_logging(void); diff --git a/storage/pbxt/src/trace_xt.cc b/storage/pbxt/src/trace_xt.cc index fed4eb59036..709ff71addc 100644 --- a/storage/pbxt/src/trace_xt.cc +++ b/storage/pbxt/src/trace_xt.cc @@ -36,6 +36,7 @@ #ifdef DEBUG //#define PRINT_TRACE //#define RESET_AFTER_DUMP +//#define DUMP_TO_STDOUT #endif static xtBool trace_initialized = FALSE; @@ -109,10 +110,10 @@ xtPublic void xt_print_trace(void) xt_lock_mutex_ns(&trace_mutex); if (trace_log_end > trace_log_offset+1) { trace_log_buffer[trace_log_end] = 0; - fprintf(stderr, "%s", trace_log_buffer + trace_log_offset + 1); + printf("%s", trace_log_buffer + trace_log_offset + 1); } trace_log_buffer[trace_log_offset] = 0; - fprintf(stderr, "%s", trace_log_buffer); + printf("%s", trace_log_buffer); trace_log_offset = 0; trace_log_end = 0; xt_unlock_mutex_ns(&trace_mutex); @@ -121,9 +122,18 @@ xtPublic void xt_print_trace(void) xtPublic void xt_dump_trace(void) { - FILE *fp; - if (trace_log_offset) { +#ifdef DUMP_TO_STDOUT + if (trace_log_end > trace_log_offset+1) { + trace_log_buffer[trace_log_end] = 0; + printf("%s", trace_log_buffer + trace_log_offset + 1); + } + trace_log_buffer[trace_log_offset] = 0; + printf("%s", trace_log_buffer); + printf("\n"); +#else + FILE *fp; + fp = fopen("pbxt.log", "w"); xt_lock_mutex_ns(&trace_mutex); @@ -136,6 +146,7 @@ xtPublic void xt_dump_trace(void) fprintf(fp, "%s", trace_log_buffer); fclose(fp); } +#endif #ifdef RESET_AFTER_DUMP trace_log_offset = 0; @@ -379,9 +390,9 @@ xtPublic void xt_dump_conn_tracking(void) ptr = conn_info; for (int i=0; i<XT_TRACK_MAX_CONNS; i++) { if (ptr->ci_curr_xact_id || ptr->ci_prev_xact_id) { - fprintf(stderr, "%3d curr=%d prev=%d prev-time=%ld\n", (int) ptr->cu_t_id, (int) ptr->ci_curr_xact_id, (int) ptr->ci_prev_xact_id, (long) ptr->ci_prev_xact_time); + printf("%3d curr=%d prev=%d prev-time=%ld\n", (int) ptr->cu_t_id, (int) ptr->ci_curr_xact_id, (int) ptr->ci_prev_xact_id, (long) ptr->ci_prev_xact_time); if (i+1<XT_TRACK_MAX_CONNS) { - fprintf(stderr, " diff=%d\n", (int) (ptr+1)->ci_curr_xact_id - (int) ptr->ci_curr_xact_id); + printf(" diff=%d\n", (int) (ptr+1)->ci_curr_xact_id - (int) ptr->ci_curr_xact_id); } } ptr++; diff --git a/storage/pbxt/src/trace_xt.h b/storage/pbxt/src/trace_xt.h index 9b00a5a05c7..34459a94dca 100644 --- a/storage/pbxt/src/trace_xt.h +++ b/storage/pbxt/src/trace_xt.h @@ -51,7 +51,9 @@ void xt_ftracef(char *fmt, ...); * CONNECTION TRACKING */ +#ifdef DEBUG #define XT_TRACK_CONNECTIONS +#endif #ifdef XT_TRACK_CONNECTIONS #define XT_TRACK_MAX_CONNS 500 diff --git a/storage/pbxt/src/xaction_xt.cc b/storage/pbxt/src/xaction_xt.cc index 86be55a6bcf..7281eafd8db 100644 --- a/storage/pbxt/src/xaction_xt.cc +++ b/storage/pbxt/src/xaction_xt.cc @@ -1326,7 +1326,7 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status) } /* Write and flush the transaction log: */ - if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE)) { + if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) { ok = FALSE; status = XT_LOG_ENT_ABORT; /* Make sure this is done, if we failed to log @@ -1440,16 +1440,23 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status) /* Don't get too far ahead of the sweeper! */ if (writer) { #ifdef XT_WAIT_FOR_CLEANUP - xtXactID wait_xn_id; - - /* This is the transaction that was committed 3 transactions ago: */ - wait_xn_id = thread->st_prev_xact[thread->st_last_xact]; - thread->st_prev_xact[thread->st_last_xact] = xn_id; - /* This works because XT_MAX_XACT_BEHIND == 2! */ - ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1)); - thread->st_last_xact ^= 1; - while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) { - xt_critical_wait(); + if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) { + /* Set a maximum wait time (1/100s) */ + xtWord8 then = xt_trace_clock() + (xtWord8) 100000; + xtXactID wait_xn_id; + + /* This is the transaction that was committed 3 transactions ago: */ + wait_xn_id = thread->st_prev_xact[thread->st_last_xact]; + thread->st_prev_xact[thread->st_last_xact] = xn_id; + /* This works because XT_MAX_XACT_BEHIND == 2! */ + ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1)); + thread->st_last_xact ^= 1; + + while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) { + if (xt_trace_clock() >= then) + break; + xt_critical_wait(); + } } #else if ((db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) != 0) { @@ -1486,7 +1493,7 @@ xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id) entry.xt_status_1 = XT_LOG_ENT_NEW_TAB; entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id); XT_SET_DISK_4(entry.xt_tab_id_4, tab_id); - return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE); + return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_WRITE_AND_FLUSH); } xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID XT_UNUSED(rec_id)) @@ -2266,6 +2273,7 @@ static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XT xt_log_and_clear_exception(self); break; } + prev_rec_id = next_rec_id; next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4); } @@ -2461,7 +2469,7 @@ static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactD cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id)); XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id); - if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, FALSE)) + if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, XT_XLOG_NO_WRITE_NO_FLUSH)) return FAILED; ss->ss_flush_pending = TRUE; @@ -2656,7 +2664,9 @@ static void xn_sw_main(XTThreadPtr self) * we flush the log. */ if (now >= idle_start + 2) { - if (!xt_xlog_flush_log(db, self)) + /* Don't do this if flusher is active! */ + if (!db->db_fl_thread && + !xt_xlog_flush_log(db, self)) xt_throw(self); ss->ss_flush_pending = FALSE; } diff --git a/storage/pbxt/src/xaction_xt.h b/storage/pbxt/src/xaction_xt.h index ac742088ebb..e679a0f38f0 100644 --- a/storage/pbxt/src/xaction_xt.h +++ b/storage/pbxt/src/xaction_xt.h @@ -153,14 +153,14 @@ typedef struct XTXactData { #define XT_XACT_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, i) #define XT_XACT_FREE_LOCK(s, i) xt_spinxslock_free(s, i) #define XT_XACT_READ_LOCK(i, s) xt_spinxslock_slock(i) -#define XT_XACT_WRITE_LOCK(i, s) xt_spinxslock_xlock(i, (s)->t_id) +#define XT_XACT_WRITE_LOCK(i, s) xt_spinxslock_xlock(i, FALSE, (s)->t_id) #define XT_XACT_UNLOCK(i, s, b) xt_spinxslock_unlock(i, b) #else #define XT_XACT_LOCK_TYPE XTSkewRWLockRec #define XT_XACT_INIT_LOCK(s, i) xt_skewrwlock_init_with_autoname(s, i) #define XT_XACT_FREE_LOCK(s, i) xt_skewrwlock_free(s, i) #define XT_XACT_READ_LOCK(i, s) xt_skewrwlock_slock(i) -#define XT_XACT_WRITE_LOCK(i, s) xt_skewrwlock_xlock(i, (s)->t_id) +#define XT_XACT_WRITE_LOCK(i, s) xt_skewrwlock_xlock(i, FALSE, (s)->t_id) #define XT_XACT_UNLOCK(i, s, b) xt_skewrwlock_unlock(i, b) #endif diff --git a/storage/pbxt/src/xactlog_xt.cc b/storage/pbxt/src/xactlog_xt.cc index f50dbc802bb..addc14ff5d8 100644 --- a/storage/pbxt/src/xactlog_xt.cc +++ b/storage/pbxt/src/xactlog_xt.cc @@ -719,14 +719,14 @@ void XTDatabaseLog::xlog_exit(XTThreadPtr self) } } -#define WR_NO_SPACE 1 -#define WR_FLUSH 2 +#define WR_NO_SPACE 1 /* Write because there is no space, or some other reason */ +#define WR_FLUSH 2 /* Normal commit, write and flush */ xtBool XTDatabaseLog::xlog_flush(XTThreadPtr thread) { if (!xlog_flush_pending()) return OK; - return xlog_append(thread, 0, NULL, 0, NULL, TRUE, NULL, NULL); + return xlog_append(thread, 0, NULL, 0, NULL, XT_XLOG_WRITE_AND_FLUSH, NULL, NULL); } xtBool XTDatabaseLog::xlog_flush_pending() @@ -754,7 +754,7 @@ xtBool XTDatabaseLog::xlog_flush_pending() * This function returns the log ID and offset of * the data write position. */ -xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, xtBool commit, xtLogID *log_id, xtLogOffset *log_offset) +xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset) { int write_reason = 0; xtLogID req_flush_log_id; @@ -763,19 +763,20 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xtWord8 flush_time; xtWord2 sum; + /* The first size value must be set, of the second is set! */ + ASSERT_NS(size1 || !size2); + if (!size1) { /* Just flush the buffer... */ xt_lck_slock(&xl_buffer_lock); - write_reason = WR_FLUSH; + write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE; req_flush_log_id = xl_append_log_id; req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos; xt_spinlock_unlock(&xl_buffer_lock); goto write_log_to_file; } - else { - req_flush_log_id = 0; - req_flush_log_offset = 0; - } + req_flush_log_id = 0; + req_flush_log_offset = 0; /* * This is a dirty read, which will send us to the @@ -866,11 +867,17 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat return OK; } } - else { + else if (size1) { /* It may be that there is now space in the append buffer: */ if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) goto copy_to_log_buffer; } + else { + /* We are just writing the buffer! */ + ASSERT_NS(write_reason == WR_NO_SPACE); + if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) + return OK; + } if (xt_trace_clock() >= then) { xt_lock_mutex_ns(&xl_write_lock); @@ -922,12 +929,18 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0); return OK; } - goto write_log_to_file; } - - /* It may be that there is now space in the append buffer: */ - if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) - goto copy_to_log_buffer; + else if (size1) { + /* It may be that there is now space in the append buffer: */ + if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) + goto copy_to_log_buffer; + } + else { + /* We are just writing the buffer! */ + ASSERT_NS(write_reason == WR_NO_SPACE); + if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) + return OK; + } goto write_log_to_file; } @@ -952,7 +965,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat return OK; } /* Not flushed, but what about written? */ - if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : 0)) <= 0) { + if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) { /* The write position is after or equal to the required flush * position. This means that all we have to do is flush * to satisfy the writers condition. @@ -960,7 +973,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xtBool ok = TRUE; if (xl_log_id != xl_write_log_id) - ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : 0), thread); + ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start), thread); if (ok) { if (xl_db->db_co_busy) { @@ -979,7 +992,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xt_lock_mutex_ns(&xl_db->db_wr_lock); xl_flush_log_id = xl_write_log_id; - xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : 0); + xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start); /* * We have written data to the log, wake the writer to commit * the data to the database. @@ -1000,8 +1013,11 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat return ok; } } - else { - /* If there is space in the buffer, then we can go on + else if (size1) { + /* If the amounf of data to be written is 0, then we are just required + * to write the transaction buffer. + * + * If there is space in the buffer, then we can go on * to copy our data into the buffer: */ if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) { @@ -1016,6 +1032,21 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat goto copy_to_log_buffer; } } + else { + /* We are just writing the buffer! */ + ASSERT_NS(write_reason == WR_NO_SPACE); + if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) { +#ifdef XT_XLOG_WAIT_SPINS + xt_writing = 0; + if (xt_waiting) + xt_cond_wakeall(&xl_write_cond); +#else + xt_writing = FALSE; + xt_cond_wakeall(&xl_write_cond); +#endif + return OK; + } + } rewrite: /* If the current write buffer has been written, then @@ -1109,7 +1140,8 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat part_size = 512 - part_size; xl_write_buffer[xl_write_buf_pos] = XT_LOG_ENT_END_OF_LOG; #ifdef HAVE_valgrind - memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size); + if (part_size > 1) + memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size - 1); #endif if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos+part_size, xl_write_buffer, &thread->st_statistics.st_xlog, thread)) goto write_failed; @@ -1197,9 +1229,13 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat xt_writing = FALSE; xt_cond_wakeall(&xl_write_cond); #endif + + if (size1 == 0) + return OK; } copy_to_log_buffer: + ASSERT_NS(size1); xt_spinlock_lock(&xl_buffer_lock); /* Now we have to check again. The check above was a dirty read! */ @@ -1291,11 +1327,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat if (log_offset) *log_offset = xl_append_log_offset + xl_append_buf_pos; xl_append_buf_pos += size1 + size2; - if (commit) { - write_reason = WR_FLUSH; + if (flush_log_at_trx_commit != XT_XLOG_NO_WRITE_NO_FLUSH) { + write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE; req_flush_log_id = xl_append_log_id; req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos; xt_spinlock_unlock(&xl_buffer_lock); + /* We have written the data already! */ + size1 = 0; + size2 = 0; goto write_log_to_file; } @@ -1485,9 +1524,9 @@ xtPublic xtBool xt_xlog_flush_log(struct XTDatabase *db, XTThreadPtr thread) return db->db_xlog.xlog_flush(thread); } -xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, xtBool commit) +xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit) { - return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, commit, NULL, NULL); + return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, flush_log_at_trx_commit, NULL, NULL); } /* Allocate a record from the free list. */ @@ -1498,7 +1537,7 @@ xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo o xtWord4 sum = 0; int check_size = 1; XTXactDataPtr xact = NULL; - xtBool commit = FALSE; + int flush_log_at_trx_commit = XT_XLOG_NO_WRITE_NO_FLUSH; switch (status) { case XT_LOG_ENT_REC_MODIFIED: @@ -1613,7 +1652,7 @@ xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo o XT_SET_DISK_4(log_entry.xp.xp_xact_id_4, op_seq); log_entry.xp.xp_xa_len_1 = (xtWord1) size; len = offsetof(XTXactPrepareEntryDRec, xp_xa_data); - commit = TRUE; + flush_log_at_trx_commit = xt_db_flush_log_at_trx_commit; break; default: ASSERT_NS(FALSE); @@ -1652,9 +1691,9 @@ xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo o xt_print_log_record(0, 0, &log_entry); #endif if (xact) - return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, commit, &xact->xd_begin_log, &xact->xd_begin_offset); + return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, &xact->xd_begin_log, &xact->xd_begin_offset); - return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, commit, NULL, NULL); + return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, NULL, NULL); } /* diff --git a/storage/pbxt/src/xactlog_xt.h b/storage/pbxt/src/xactlog_xt.h index db1cf944c1b..f72b810d66b 100644 --- a/storage/pbxt/src/xactlog_xt.h +++ b/storage/pbxt/src/xactlog_xt.h @@ -75,6 +75,10 @@ struct XTDatabase; #define XT_DELETE_LOGS 1 #define XT_KEEP_LOGS 2 +#define XT_XLOG_NO_WRITE_NO_FLUSH 0 +#define XT_XLOG_WRITE_AND_FLUSH 1 +#define XT_XLOG_WRITE_AND_NO_FLUSH 2 + /* LOG CACHE ---------------------------------------------------- */ typedef struct XTXLogBlock { @@ -443,7 +447,7 @@ typedef struct XTDatabaseLog { void xlog_name(size_t size, char *path, xtLogID log_id); int xlog_delete_log(xtLogID del_log_id, struct XTThread *thread); - xtBool xlog_append(struct XTThread *thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, xtBool commit, xtLogID *log_id, xtLogOffset *log_offset); + xtBool xlog_append(struct XTThread *thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset); xtBool xlog_flush(struct XTThread *thread); xtBool xlog_flush_pending(); @@ -464,7 +468,7 @@ private: } XTDatabaseLogRec, *XTDatabaseLogPtr; xtBool xt_xlog_flush_log(struct XTDatabase *db, struct XTThread *thread); -xtBool xt_xlog_log_data(struct XTThread *thread, size_t len, XTXactLogBufferDPtr log_entry, xtBool commit); +xtBool xt_xlog_log_data(struct XTThread *thread, size_t len, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit); xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtRecordID free_list, xtRecordID address, size_t size, xtWord1 *data, struct XTThread *thread); void xt_xlog_init(struct XTThread *self, size_t cache_size); diff --git a/storage/pbxt/src/xt_config.h b/storage/pbxt/src/xt_config.h index f3749071087..cb3009ecb79 100644 --- a/storage/pbxt/src/xt_config.h +++ b/storage/pbxt/src/xt_config.h @@ -128,4 +128,8 @@ const int max_connections = 500; #endif #endif +#if defined(DBUG_ON) && !defined(DBUG_OFF) && !defined(DEBUG) +#define DEBUG +#endif // DBUG_ON + #endif diff --git a/storage/pbxt/src/xt_defs.h b/storage/pbxt/src/xt_defs.h index 0e1d11d52bd..98ebe0957a5 100644 --- a/storage/pbxt/src/xt_defs.h +++ b/storage/pbxt/src/xt_defs.h @@ -221,7 +221,7 @@ typedef struct XTPathStr { */ #ifdef DEBUG -//#define XT_USE_GLOBAL_DEBUG_SIZES +#define XT_USE_GLOBAL_DEBUG_SIZES #endif /* @@ -392,6 +392,11 @@ typedef struct XTPathStr { //#define XT_NO_ATOMICS #endif +/* When pbxt_flush_log_at_trx_commit != 1, the transaction log is flushed + * at regular intervals. Set the interval here. + */ +#define XT_XLOG_FLUSH_FREQ 1000 + /* ---------------------------------------------------------------------- * GLOBAL CONSTANTS */ @@ -457,21 +462,24 @@ typedef struct XTPathStr { #ifdef XT_USE_GLOBAL_DEBUG_SIZES //#undef XT_ROW_RWLOCKS -//#define XT_ROW_RWLOCKS 2 +//#define XT_ROW_RWLOCKS 2 //#undef XT_TAB_MIN_VAR_REC_LENGTH -//#define XT_TAB_MIN_VAR_REC_LENGTH 20 +//#define XT_TAB_MIN_VAR_REC_LENGTH 20 //#undef XT_ROW_LOCK_COUNT -//#define XT_ROW_LOCK_COUNT (XT_ROW_RWLOCKS * 2) +//#define XT_ROW_LOCK_COUNT (XT_ROW_RWLOCKS * 2) //#undef XT_INDEX_PAGE_SHIFTS -//#define XT_INDEX_PAGE_SHIFTS 8 // 256 +//#define XT_INDEX_PAGE_SHIFTS 8 // 256 //#undef XT_BLOCK_SIZE_FOR_DIRECT_IO -//#define XT_BLOCK_SIZE_FOR_DIRECT_IO 256 +//#define XT_BLOCK_SIZE_FOR_DIRECT_IO 256 //#undef XT_INDEX_WRITE_BUFFER_SIZE -//#define XT_INDEX_WRITE_BUFFER_SIZE (40 * 1024) +//#define XT_INDEX_WRITE_BUFFER_SIZE (40 * 1024) + +//#undef XT_XLOG_FLUSH_FREQ +//#define XT_XLOG_FLUSH_FREQ (30 * 1000) #endif |