diff options
author | Luke Chen <luke.chen@mongodb.com> | 2021-03-17 11:33:06 +1100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-17 00:45:57 +0000 |
commit | 4f4af8fac1f4019ceb1696a92845303b5f170f2a (patch) | |
tree | eb11009f5234cbc6c4317c3cf027f012b3a2ea76 /src/third_party | |
parent | 44936147c571d6f42821ba81b154e0746c3d7a14 (diff) | |
download | mongo-4f4af8fac1f4019ceb1696a92845303b5f170f2a.tar.gz |
Import wiredtiger: 8d6061cfc8910dd0e591d8025e8b2649c063e71e from branch mongodb-4.4
ref: 68bc106b4a..8d6061cfc8
for: 4.4.5
WT-6066 Re-enable endianness tests on evergreen
WT-6525 New parameter required in __wt_hs_insert_updates to indicate successful write in history
WT-6709 Remove timestamp queues that used to store read/durable timestamps
WT-6854 Implement test harness validation
WT-6855 Initial implementation of runtime monitor
WT-6935 Update connection base write generation value from all file checkpoints in metadata
WT-7072 Add a column store example
WT-7108 Update column store documentation to include ex_col_store example
WT-7120 Add variable-length column store to RTS for updates in the data store with history
WT-7150 Trailing uninit mem in schema project
WT-7177 Create a shared storage extension that implements a local storage solution
WT-7223 WT_CALL_FUNCTION should not print out a message
WT-7301 Revert configuration changes in poc_test
Diffstat (limited to 'src/third_party')
50 files changed, 3731 insertions, 809 deletions
diff --git a/src/third_party/wiredtiger/build_posix/Make.subdirs b/src/third_party/wiredtiger/build_posix/Make.subdirs index d0707f82f7e..cab55cd27fb 100644 --- a/src/third_party/wiredtiger/build_posix/Make.subdirs +++ b/src/third_party/wiredtiger/build_posix/Make.subdirs @@ -16,6 +16,7 @@ ext/compressors/zstd ZSTD ext/encryptors/nop ext/encryptors/rotn ext/extractors/csv +ext/storage_sources/local_store POSIX_HOST ext/test/fail_fs ext/test/local_store . diff --git a/src/third_party/wiredtiger/dist/s_string.ok b/src/third_party/wiredtiger/dist/s_string.ok index 2f87769493a..890f9fa2417 100644 --- a/src/third_party/wiredtiger/dist/s_string.ok +++ b/src/third_party/wiredtiger/dist/s_string.ok @@ -15,6 +15,7 @@ ARG ARGS ASAN ASM +AUS Addr Ailamaki Alakuijala @@ -158,6 +159,7 @@ FreeBSD's FreeLibrary Fsync Fuerst +GBR GCC GIDs GLIBC @@ -288,6 +290,7 @@ NOVALUE NOWAIT NUL NUM +NZD NetBSD NoAddr Noll @@ -341,6 +344,7 @@ RNG RPC RTS RUNDIR +RUS RWLOCK RXB Radu @@ -572,6 +576,7 @@ ccr cd ce ceh +celsius centric cfg cfko @@ -696,6 +701,7 @@ desc designator dest destSize +destructor dev dh dhandle @@ -762,6 +768,7 @@ existp extern extlist fadvise +fahrenheit fallocate fallthrough fblocks @@ -848,6 +855,7 @@ goutf gt handleops handlep +hardcoded hashval havesize hdr @@ -942,6 +950,7 @@ keycmp keyid keylen keyv +kmsid kv kvraw kvs @@ -971,6 +980,7 @@ llll llu llvm loadtext +loc localTime localkey localtime @@ -1015,6 +1025,7 @@ maxCLevel maxcpu maxdbs maxdiff +mb mbll mbss mem @@ -1053,6 +1064,7 @@ mutexes mux mytable mytxn +nTemp namespace namespaces nbits diff --git a/src/third_party/wiredtiger/dist/s_void b/src/third_party/wiredtiger/dist/s_void index 70a938da4b8..cba5984220e 100755 --- a/src/third_party/wiredtiger/dist/s_void +++ b/src/third_party/wiredtiger/dist/s_void @@ -104,6 +104,11 @@ func_ok() -e '/int index_compare_primary$/d' \ -e '/int index_compare_u$/d' \ -e '/int index_extractor_u$/d' \ + -e '/int local_err$/d' \ + -e '/int local_file_lock$/d' \ + -e '/int local_file_sync$/d' \ + -e '/int local_location_handle_close$/d' \ + -e '/int local_location_list_free$/d' \ -e '/int log_print_err$/d' \ -e '/int lz4_error$/d' \ -e '/int lz4_pre_size$/d' \ diff --git a/src/third_party/wiredtiger/dist/stat_data.py b/src/third_party/wiredtiger/dist/stat_data.py index 7a25fa521a4..a2e48995d81 100644 --- a/src/third_party/wiredtiger/dist/stat_data.py +++ b/src/third_party/wiredtiger/dist/stat_data.py @@ -535,11 +535,6 @@ connection_stats = [ TxnStat('txn_checkpoint_time_recent', 'transaction checkpoint most recent time (msecs)', 'no_clear,no_scale'), TxnStat('txn_checkpoint_time_total', 'transaction checkpoint total time (msecs)', 'no_clear,no_scale'), TxnStat('txn_commit', 'transactions committed'), - TxnStat('txn_durable_queue_empty', 'durable timestamp queue insert to empty'), - TxnStat('txn_durable_queue_head', 'durable timestamp queue inserts to head'), - TxnStat('txn_durable_queue_inserts', 'durable timestamp queue inserts total'), - TxnStat('txn_durable_queue_len', 'durable timestamp queue length'), - TxnStat('txn_durable_queue_walked', 'durable timestamp queue entries walked'), TxnStat('txn_fail_cache', 'transaction failures due to history store'), TxnStat('txn_pinned_checkpoint_range', 'transaction range of IDs currently pinned by a checkpoint', 'no_clear,no_scale'), TxnStat('txn_pinned_range', 'transaction range of IDs currently pinned', 'no_clear,no_scale'), @@ -553,11 +548,6 @@ connection_stats = [ TxnStat('txn_prepare_rollback', 'prepared transactions rolled back'), TxnStat('txn_prepared_updates_count', 'Number of prepared updates'), TxnStat('txn_query_ts', 'query timestamp calls'), - TxnStat('txn_read_queue_empty', 'read timestamp queue insert to empty'), - TxnStat('txn_read_queue_head', 'read timestamp queue inserts to head'), - TxnStat('txn_read_queue_inserts', 'read timestamp queue inserts total'), - TxnStat('txn_read_queue_len', 'read timestamp queue length'), - TxnStat('txn_read_queue_walked', 'read timestamp queue entries walked'), TxnStat('txn_rollback', 'transactions rolled back'), TxnStat('txn_rts', 'rollback to stable calls'), TxnStat('txn_rts_pages_visited', 'rollback to stable pages visited'), @@ -572,6 +562,7 @@ connection_stats = [ TxnStat('txn_set_ts_stable_upd', 'set timestamp stable updates'), TxnStat('txn_sync', 'transaction sync calls'), TxnStat('txn_timestamp_oldest_active_read', 'transaction read timestamp of the oldest active reader', 'no_clear,no_scale'), + TxnStat('txn_walk_sessions', 'transaction walk of concurrent sessions'), ########################################## # Yield statistics diff --git a/src/third_party/wiredtiger/dist/test_data.py b/src/third_party/wiredtiger/dist/test_data.py index d4724813331..15d3d2e7150 100644 --- a/src/third_party/wiredtiger/dist/test_data.py +++ b/src/third_party/wiredtiger/dist/test_data.py @@ -49,7 +49,7 @@ key_config=[ value_config = [ Config('value_size', 0, r''' - The size of the values to be created''', min=0, max=10000), + The size of the values to be created''', min=0, max=1000000000), ] scale_config = [ @@ -59,24 +59,53 @@ scale_config = [ The number of keys to be operated on per colection''', min=0, max=1000000), ] +throttle_config = [ + Config('rate_per_second',1,r''' + The number of times an operation should be performed per second''', min=1,max=1000), +] + +stat_config = [ + Config('enabled', 'false', r''' + Whether or not this statistic is relevant to the workload''', + type='boolean'), +] + +limit_stat = stat_config + [ + Config('limit', 0, r''' + The limit value a statistic is allowed to reach''') +] + load_config = key_config + value_config + scale_config workload_config = [ + Config('enable_tracking', 'true', r''' + Enables tracking to perform validation''', type='boolean'), Config('duration_seconds', 0, r''' - The duration that the workload run phase will last''',min=0, max=1000000), + The duration that the workload run phase will last''', min=0, max=1000000), Config('read_threads', 0, r''' The number of threads performing read operations''', min=0, max=100), Config('insert_threads', 0, r''' - The number of threads performing insert operations''',min=0, max=20), + The number of threads performing insert operations''', min=0, max=20), Config('insert_config',0, r''' The definition of the record being inserted''', subconfig=load_config), Config('update_threads', 0, r''' - The number of threads performing update operations''',min=0, max=20), + The number of threads performing update operations''', min=0, max=20), Config('update_config',0,r''', The definition of the record being updated''', subconfig=load_config) ] +test_config = [ + Config('cache_size_mb', 0, r''' + The cache size that wiredtiger will be configured to run with''', min=0, max=100000000000) +] + +runtime_monitor_config = throttle_config +[ + Config('stat_cache_size', '', ''' + The maximum cache percentage that can be hit while running.''', + type='category', subconfig=limit_stat) +] + methods = { -'poc_test' : Method(load_config + workload_config), +'poc_test' : Method(load_config + workload_config + runtime_monitor_config + test_config), } diff --git a/src/third_party/wiredtiger/examples/c/Makefile.am b/src/third_party/wiredtiger/examples/c/Makefile.am index fb1519de91e..664e0f33ce7 100644 --- a/src/third_party/wiredtiger/examples/c/Makefile.am +++ b/src/third_party/wiredtiger/examples/c/Makefile.am @@ -9,6 +9,7 @@ noinst_PROGRAMS = \ ex_backup \ ex_backup_block \ ex_call_center \ + ex_col_store \ ex_config_parse \ ex_cursor \ ex_data_source \ diff --git a/src/third_party/wiredtiger/examples/c/ex_col_store.c b/src/third_party/wiredtiger/examples/c/ex_col_store.c new file mode 100644 index 00000000000..a1d90ba3698 --- /dev/null +++ b/src/third_party/wiredtiger/examples/c/ex_col_store.c @@ -0,0 +1,529 @@ +/*- + * Public Domain 2014-2020 MongoDB, Inc. + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + * + * ex_col_store.c + * This is an example application that demonstrates + * how to perform operations with column store. + */ + +#include <test_util.h> +#include <assert.h> + +#define NUM_ENTRIES 100 +#define TABLE_NAME "table:weather" +#define NUM_REC 5 +#define NUM_COUNTRIES 7 + +static const char *home; + +/*! [col-store decl] */ +typedef struct { + uint16_t hour; + uint16_t pressure; + uint16_t loc_lat; + uint16_t loc_long; + uint8_t temp; + uint8_t humidity; + uint8_t wind; + uint8_t feels_like_temp; + char day[5]; + char country[5]; +} WEATHER; + +/*! [col-store decl] */ + +static void update_celsius_to_fahrenheit(WT_SESSION *session); +static void print_all_columns(WT_SESSION *session); +static void generate_data(WEATHER *w_array); +static void remove_country(WT_SESSION *session); +static void average_data(WT_SESSION *session, char *country_average); +static int find_min_and_max_temp( + WT_SESSION *session, uint16_t start_time, uint16_t end_time, int *min_temp, int *max_temp); + +static void +print_all_columns(WT_SESSION *session) +{ + WT_CURSOR *cursor; + WT_DECL_RET; + uint64_t recno; + uint16_t hour, loc_lat, loc_long, pressure; + uint8_t feels_like_temp, humidity, temp, wind; + const char *country, *day; + + error_check(session->open_cursor(session, TABLE_NAME, NULL, NULL, &cursor)); + while ((ret = cursor->next(cursor)) == 0) { + error_check(cursor->get_key(cursor, &recno)); + error_check(cursor->get_value(cursor, &hour, &pressure, &loc_lat, &loc_long, &temp, + &humidity, &wind, &feels_like_temp, &day, &country)); + + printf( + "{\n" + " ID: %" PRIu64 + "\n" + " day: %s\n" + " hour: %" PRIu16 + "\n" + " temp: %" PRIu8 + "\n" + " humidity: %" PRIu8 + "\n" + " pressure: %" PRIu16 + "\n" + " wind: %" PRIu8 + "\n" + " feels like: %" PRIu8 + "\n" + " lat: %" PRIu16 + "\n" + " long: %" PRIu16 + "\n" + " country: %s\n" + "}\n\n", + recno, day, hour, temp, humidity, pressure, wind, feels_like_temp, loc_lat, loc_long, + country); + } + scan_end_check(ret == WT_NOTFOUND); + error_check(cursor->close(cursor)); +} + +static void +update_celsius_to_fahrenheit(WT_SESSION *session) +{ + WT_CURSOR *cursor; + WT_DECL_RET; + uint8_t temp, temp_in_fahrenheit; + + printf("Converting temperature from celsius to fahrenheit.\n"); + + /*! [col-store temperature] */ + error_check(session->open_cursor(session, "colgroup:weather:temperature", NULL, NULL, &cursor)); + while ((ret = cursor->next(cursor)) == 0) { + error_check(cursor->get_value(cursor, &temp)); + + /* + * Update the value from celsius to fahrenheit. Discarding decimals and keeping data simple + * by type casting to uint8_t. + */ + temp_in_fahrenheit = (uint8_t)((1.8 * temp) + 32.0); + + cursor->set_value(cursor, temp_in_fahrenheit); + error_check(cursor->update(cursor)); + } + scan_end_check(ret == WT_NOTFOUND); + error_check(cursor->close(cursor)); + /*! [col-store temperature] */ +} + +static void +remove_country(WT_SESSION *session) +{ + WT_CURSOR *cursor; + WT_DECL_RET; + uint64_t recno; + uint16_t loc_lat, loc_long; + const char *country; + + printf("Removing all data for country AUS.\n"); + error_check(session->open_cursor(session, "colgroup:weather:location", NULL, NULL, &cursor)); + /* + * All Australian data is being removed, to test if deletion works. + */ + while ((ret = cursor->next(cursor)) == 0) { + error_check(cursor->get_key(cursor, &recno)); + error_check(cursor->get_value(cursor, &loc_lat, &loc_long, &country)); + if (strcmp("AUS", country) == 0) { + error_check(cursor->remove(cursor)); + } + } + scan_end_check(ret == WT_NOTFOUND); + error_check(cursor->close(cursor)); +} + +static void +generate_data(WEATHER *w_array) +{ + WEATHER w; + int country, day; + + srand((unsigned int)getpid()); + + for (int i = 0; i < NUM_ENTRIES; i++) { + day = rand() % 7; + switch (day) { + case 0: + strcpy(w.day, "MON"); + break; + case 1: + strcpy(w.day, "TUE"); + break; + case 2: + strcpy(w.day, "WED"); + break; + case 3: + strcpy(w.day, "THU"); + break; + case 4: + strcpy(w.day, "FRI"); + break; + case 5: + strcpy(w.day, "SAT"); + break; + case 6: + strcpy(w.day, "SUN"); + break; + default: + assert(false); + } + /* 24-hour-time 0-2400. */ + w.hour = rand() % 2401; + /* Temperature range: 0-50C. */ + w.temp = rand() % 51; + /* Feels like temperature range 0-50C */ + w.feels_like_temp = rand() % 51; + /* Humidity range: 0-100%. */ + w.humidity = rand() % 101; + /* Pressure range: 900-1100pa */ + w.pressure = (rand() % (1100 + 1 - 900)) + 900; + /* Wind range: 0-200 km/hr. */ + w.wind = rand() % 201; + /* latitude: 0-180 degrees. */ + w.loc_lat = rand() % 181; + /* longitude: 0-90 degrees. */ + w.loc_long = rand() % 91; + + country = rand() % 7; + switch (country) { + case 0: + strcpy(w.country, "AUS"); + break; + case 1: + strcpy(w.country, "GBR"); + break; + case 2: + strcpy(w.country, "USA"); + break; + case 3: + strcpy(w.country, "NZD"); + break; + case 4: + strcpy(w.country, "IND"); + break; + case 5: + strcpy(w.country, "CHI"); + break; + case 6: + strcpy(w.country, "RUS"); + break; + default: + assert(false); + } + + w_array[i] = w; + } +} + +/* + * find_min_and_max_temp -- + * The function returns 0 when a valid min/max temperature can be calculated given the time + * range. If no records are found it will return WT_NOTFOUND, otherwise the program will crash + * if an internal error is encountered. + */ +static int +find_min_and_max_temp( + WT_SESSION *session, uint16_t start_time, uint16_t end_time, int *min_temp, int *max_temp) +{ + WT_CURSOR *end_time_cursor, *join_cursor, *start_time_cursor; + WT_DECL_RET; + uint64_t recno; + int exact; + uint16_t hour; + uint8_t temp; + + /*! [col-store join] */ + + /* Open cursors needed by the join. */ + error_check( + session->open_cursor(session, "join:table:weather(hour,temp)", NULL, NULL, &join_cursor)); + error_check( + session->open_cursor(session, "index:weather:hour", NULL, NULL, &start_time_cursor)); + error_check(session->open_cursor(session, "index:weather:hour", NULL, NULL, &end_time_cursor)); + + /* + * Select values WHERE (hour >= start AND hour <= end). Find the starting record closest to + * desired start time. + */ + start_time_cursor->set_key(start_time_cursor, start_time); + error_check(start_time_cursor->search_near(start_time_cursor, &exact)); + if (exact == -1) { + ret = start_time_cursor->next(start_time_cursor); + if (ret == WT_NOTFOUND) + return ret; + else + error_check(ret); + } + + error_check(session->join(session, join_cursor, start_time_cursor, "compare=ge")); + + /* Find the ending record closest to desired end time. */ + end_time_cursor->set_key(end_time_cursor, end_time); + error_check(end_time_cursor->search_near(end_time_cursor, &exact)); + if (exact == 1) { + ret = end_time_cursor->prev(end_time_cursor); + if (ret == WT_NOTFOUND) + return ret; + else + error_check(ret); + } + + error_check(session->join(session, join_cursor, end_time_cursor, "compare=le")); + + /* Initialize minimum temperature and maximum temperature to temperature of the first record. */ + ret = join_cursor->next(join_cursor); + if (ret == WT_NOTFOUND) + return ret; + else + error_check(ret); + + error_check(join_cursor->get_key(join_cursor, &recno)); + error_check(join_cursor->get_value(join_cursor, &hour, &temp)); + *min_temp = temp; + *max_temp = temp; + + /* Iterating through found records between start and end time to find the min & max temps. */ + while ((ret = join_cursor->next(join_cursor)) == 0) { + error_check(join_cursor->get_value(join_cursor, &hour, &temp)); + + *min_temp = WT_MIN(*min_temp, temp); + *max_temp = WT_MAX(*max_temp, temp); + } + + /*! [col-store join] */ + + /* + * If WT_NOTFOUND is hit at this point, it is because we have traversed through all temperature + * records, hence we return 0 to the calling function to signal success. Otherwise an internal + * error was hit. + */ + if (ret != WT_NOTFOUND) + error_check(ret); + + return (0); +} + +/* + * average_data -- + * Obtains the average data across all fields given a specific location. + */ +void +average_data(WT_SESSION *session, char *country_average) +{ + WT_CURSOR *loc_cursor; + WT_DECL_RET; + unsigned int count; + /* rec_arr holds the sum of the records in order to obtain the averages. */ + unsigned int rec_arr[NUM_REC]; + uint16_t hour, loc_lat, loc_long, pressure; + uint8_t feels_like_temp, humidity, temp, wind; + const char *country, *day; + + /* Open a cursor to search for the location. */ + error_check(session->open_cursor(session, "index:weather:country", NULL, NULL, &loc_cursor)); + loc_cursor->set_key(loc_cursor, country_average); + ret = loc_cursor->search(loc_cursor); + + /* + * Error handling in the case RUS is not found. In this case as it's a hardcoded location, + * if there aren't any matching locations, no average data is obtained and we proceed with the + * test instead of aborting. If an unexpected error occurs, exit. + */ + if (ret == WT_NOTFOUND) + return; + else if (ret != 0) + exit(EXIT_FAILURE); + + /* Populate the array with the totals of each of the columns. */ + count = 0; + memset(rec_arr, 0, sizeof(rec_arr)); + while (ret == 0) { + error_check(loc_cursor->get_value(loc_cursor, &hour, &pressure, &loc_lat, &loc_long, &temp, + &humidity, &wind, &feels_like_temp, &day, &country)); + + if (strcmp(country, country_average) != 0) { + ret = loc_cursor->next(loc_cursor); + continue; + } + + count++; + /* Increment the values of the rec_arr with the temp_arr values. */ + rec_arr[0] += temp; + rec_arr[1] += humidity; + rec_arr[2] += pressure; + rec_arr[3] += wind; + rec_arr[4] += feels_like_temp; + + ret = loc_cursor->next(loc_cursor); + } + + scan_end_check(ret == WT_NOTFOUND); + error_check(loc_cursor->close(loc_cursor)); + + /* Get the average values by dividing with the total number of records. */ + for (int i = 0; i < NUM_REC; i++) + rec_arr[i] = rec_arr[i] / count; + + /* List the average records */ + printf( + "Average records for location %s : \nTemp: %u" + ", Humidity: %u" + ", Pressure: %u" + ", Wind: %u" + ", Feels like: %u" + "\n", + country_average, rec_arr[0], rec_arr[1], rec_arr[2], rec_arr[3], rec_arr[4]); +} + +/*! [col-store main] */ +int +main(int argc, char *argv[]) +{ + WT_CONNECTION *conn; + WT_CURSOR *cursor; + WT_SESSION *session; + WEATHER weather_data[NUM_ENTRIES]; + char countries[][NUM_COUNTRIES - 1] = {"AUS", "GBR", "USA", "NZD", "IND", "CHI", "RUS"}; + int max_temp_result, min_temp_result, ret; + uint16_t ending_time, starting_time; + + home = example_setup(argc, argv); + + /* Establishing a connection. */ + error_check(wiredtiger_open(home, NULL, "create,statistics=(fast)", &conn)); + + /* Establishing a session. */ + error_check(conn->open_session(conn, NULL, NULL, &session)); + + /*! [col-store create columns] */ + /* Create a table with columns and colgroups. */ + error_check(session->create(session, TABLE_NAME, + "key_format=r,value_format=" WT_UNCHECKED_STRING( + HHHHBBBB5S5S) ",columns=(id,hour,pressure,loc_lat," + "loc_long,temp,humidity," + "wind,feels_like_temp,day,country),colgroups=(day_time,temperature," + "humidity_pressure," + "wind,feels_like_temp,location)")); + + /* Create the colgroups */ + error_check(session->create(session, "colgroup:weather:day_time", "columns=(hour,day)")); + error_check(session->create(session, "colgroup:weather:temperature", "columns=(temp)")); + /*! [col-store create columns] */ + error_check(session->create( + session, "colgroup:weather:humidity_pressure", "columns=(pressure,humidity)")); + error_check(session->create(session, "colgroup:weather:wind", "columns=(wind)")); + error_check( + session->create(session, "colgroup:weather:feels_like_temp", "columns=(feels_like_temp)")); + error_check( + session->create(session, "colgroup:weather:location", "columns=(loc_lat,loc_long,country)")); + + /* Generating random data to populate the weather table. */ + generate_data(weather_data); + + /* Open a cursor on the table to insert the data. */ + error_check(session->open_cursor(session, TABLE_NAME, NULL, "append", &cursor)); + for (int i = 0; i < NUM_ENTRIES; i++) { + cursor->set_value(cursor, weather_data[i].hour, weather_data[i].pressure, + weather_data[i].loc_lat, weather_data[i].loc_long, weather_data[i].temp, + weather_data[i].humidity, weather_data[i].wind, weather_data[i].feels_like_temp, + weather_data[i].day, weather_data[i].country); + error_check(cursor->insert(cursor)); + } + /* Close cursor. */ + error_check(cursor->close(cursor)); + + /* Prints all the data in the database. */ + print_all_columns(session); + + /* Create indexes for searching */ + error_check(session->create(session, "index:weather:hour", "columns=(hour)")); + error_check(session->create(session, "index:weather:country", "columns=(country)")); + + /* + * Start and end points for time range for finding min/max temperature, in 24 hour format. + * Example uses 10am - 8pm but can change the values for desired start and end times. + */ + starting_time = 1000; + ending_time = 2000; + min_temp_result = 0; + max_temp_result = 0; + ret = find_min_and_max_temp( + session, starting_time, ending_time, &min_temp_result, &max_temp_result); + + /* If the min/max temperature is not found due to some error, there is no result to print. */ + if (ret == 0) { + printf("The minimum temperature between %" PRIu16 " and %" PRIu16 " is %d.\n", + starting_time, ending_time, min_temp_result); + printf("The maximum temperature between %" PRIu16 " and %" PRIu16 " is %d.\n", + starting_time, ending_time, max_temp_result); + } + + /* Update the temperature from Celsius to Fahrenheit. */ + update_celsius_to_fahrenheit(session); + + /* + * Start and end points for time range for finding min/max temperature, in 24 hour format. + * Example uses 10am - 8pm but can change the values for desired start and end times. + */ + starting_time = 1000; + ending_time = 2000; + min_temp_result = 0; + max_temp_result = 0; + ret = find_min_and_max_temp( + session, starting_time, ending_time, &min_temp_result, &max_temp_result); + + /* If the min/max temperature is not found due to some error, there is no result to print. */ + if (ret == 0) { + printf("The minimum temperature between %" PRIu16 " and %" PRIu16 " is %d.\n", + starting_time, ending_time, min_temp_result); + printf("The maximum temperature between %" PRIu16 " and %" PRIu16 " is %d.\n", + starting_time, ending_time, max_temp_result); + } + + printf("Average for all countries:\n"); + for (int i = 0; i < NUM_COUNTRIES; i++) + average_data(session, countries[i]); + + remove_country(session); + + printf("Average for all countries:\n"); + for (int i = 0; i < NUM_COUNTRIES; i++) + average_data(session, countries[i]); + + /* Close the connection. */ + error_check(conn->close(conn, NULL)); + return (EXIT_SUCCESS); +} + +/*! [col-store main] */ diff --git a/src/third_party/wiredtiger/ext/storage_sources/local_store/Makefile.am b/src/third_party/wiredtiger/ext/storage_sources/local_store/Makefile.am new file mode 100644 index 00000000000..3debb3f8a64 --- /dev/null +++ b/src/third_party/wiredtiger/ext/storage_sources/local_store/Makefile.am @@ -0,0 +1,9 @@ +AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include + +noinst_LTLIBRARIES = libwiredtiger_local_store.la +libwiredtiger_local_store_la_SOURCES = local_store.c + +# libtool hack: noinst_LTLIBRARIES turns off building shared libraries as well +# as installation, it will only build static libraries. As far as I can tell, +# the "approved" libtool way to turn them back on is by adding -rpath. +libwiredtiger_local_store_la_LDFLAGS = -avoid-version -module -rpath /nowhere diff --git a/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c b/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c new file mode 100644 index 00000000000..86e8dad06d5 --- /dev/null +++ b/src/third_party/wiredtiger/ext/storage_sources/local_store/local_store.c @@ -0,0 +1,1269 @@ +/*- + * Public Domain 2014-2020 MongoDB, Inc. + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/stat.h> +#include <sys/types.h> + +#include <wiredtiger.h> +#include <wiredtiger_ext.h> +#include "queue.h" + +/* + * This storage source implementation is used for demonstration and testing. All objects are stored + * as local files. + */ + +#ifdef __GNUC__ +#if __GNUC__ > 7 || (__GNUC__ == 7 && __GNUC_MINOR__ > 0) +/* + * !!! + * GCC with -Wformat-truncation complains about calls to snprintf in this file. + * There's nothing wrong, this makes the warning go away. + */ +#pragma GCC diagnostic ignored "-Wformat-truncation" +#endif +#endif + +/* Local storage source structure. */ +typedef struct { + WT_STORAGE_SOURCE storage_source; /* Must come first */ + + WT_EXTENSION_API *wt_api; /* Extension API */ + + /* + * Locks are used to protect the file handle queue and flush queue. + */ + pthread_rwlock_t file_handle_lock; + pthread_rwlock_t flush_lock; + + /* + * Configuration values are set at startup. + */ + uint32_t delay_ms; /* Average length of delay when simulated */ + uint32_t force_delay; /* Force a simulated network delay every N operations */ + uint32_t force_error; /* Force a simulated network error every N operations */ + uint32_t verbose; /* Verbose level */ + + /* + * Statistics are collected but not yet exposed. + */ + uint64_t fh_ops; /* Non-read/write operations in file handles */ + uint64_t object_flushes; /* (What would be) writes to the cloud */ + uint64_t op_count; /* Number of operations done on local */ + uint64_t read_ops; + uint64_t write_ops; + + /* Queue of file handles */ + TAILQ_HEAD(local_file_handle_qh, local_file_handle) fileq; + TAILQ_HEAD(local_flush_qh, local_flush_item) flushq; + +} LOCAL_STORAGE; + +/* + * Indicates a object that has not yet been flushed. + */ +typedef struct local_flush_item { + char *src_path; /* File name to copy from, object name derived from this */ + char *marker_path; /* Marker name to remove when done */ + + /* + * These fields would be used in performing a flush. + */ + char *bucket; /* Bucket name */ + char *kmsid; /* Identifier for key management system */ + + TAILQ_ENTRY(local_flush_item) q; /* Queue of items */ +} LOCAL_FLUSH_ITEM; + +typedef struct local_file_handle { + WT_FILE_HANDLE iface; /* Must come first */ + + LOCAL_STORAGE *local; /* Enclosing storage source */ + int fd; /* File descriptor */ + char *path; /* Path name of file */ + char *temp_path; /* Temporary (hidden) name, set if newly created */ + LOCAL_FLUSH_ITEM *flush; /* Flush information, set if newly created */ + + TAILQ_ENTRY(local_file_handle) q; /* Queue of handles */ +} LOCAL_FILE_HANDLE; + +typedef struct local_location { + WT_LOCATION_HANDLE iface; /* Must come first */ + + char *cluster_prefix; /* Cluster prefix */ + char *bucket; /* Actually a directory path for local implementation */ + char *kmsid; /* Identifier for key management system */ +} LOCAL_LOCATION; + +/* + * Forward function declarations for internal functions + */ +static int local_config_dup( + LOCAL_STORAGE *, WT_SESSION *, WT_CONFIG_ITEM *, const char *, const char *, char **); +static int local_configure(LOCAL_STORAGE *, WT_CONFIG_ARG *); +static int local_configure_int(LOCAL_STORAGE *, WT_CONFIG_ARG *, const char *, uint32_t *); +static int local_delay(LOCAL_STORAGE *); +static int local_err(LOCAL_STORAGE *, WT_SESSION *, int, const char *, ...); +static void local_flush_free(LOCAL_FLUSH_ITEM *); +static int local_location_decode(LOCAL_STORAGE *, WT_LOCATION_HANDLE *, char **, char **, char **); +static int local_location_path( + LOCAL_STORAGE *, WT_LOCATION_HANDLE *, const char *, const char *, char **); + +/* + * Forward function declarations for storage source API implementation + */ +static int local_exist( + WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, const char *, bool *); +static int local_flush( + WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, const char *, const char *); +static int local_flush_one(LOCAL_STORAGE *, WT_SESSION *, LOCAL_FLUSH_ITEM *); +static int local_location_handle( + WT_STORAGE_SOURCE *, WT_SESSION *, const char *, WT_LOCATION_HANDLE **); +static int local_location_handle_close(WT_LOCATION_HANDLE *, WT_SESSION *); +static int local_location_list(WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, + const char *, uint32_t, char ***, uint32_t *); +static int local_location_list_free(WT_STORAGE_SOURCE *, WT_SESSION *, char **, uint32_t); +static int local_location_list_internal(WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, + const char *, const char *, uint32_t, char ***, uint32_t *); +static int local_open(WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, const char *, + uint32_t, WT_FILE_HANDLE **); +static int local_remove( + WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, const char *, uint32_t); +static int local_size( + WT_STORAGE_SOURCE *, WT_SESSION *, WT_LOCATION_HANDLE *, const char *, wt_off_t *); +static int local_terminate(WT_STORAGE_SOURCE *, WT_SESSION *); + +/* + * Forward function declarations for file handle API implementation + */ +static int local_file_close(WT_FILE_HANDLE *, WT_SESSION *); +static int local_file_close_internal(LOCAL_STORAGE *, WT_SESSION *, LOCAL_FILE_HANDLE *, bool); +static int local_file_lock(WT_FILE_HANDLE *, WT_SESSION *, bool); +static int local_file_read(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, void *); +static int local_file_size(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t *); +static int local_file_sync(WT_FILE_HANDLE *, WT_SESSION *); +static int local_file_write(WT_FILE_HANDLE *, WT_SESSION *, wt_off_t, size_t, const void *); + +/* + * Report an error for a file operation. Note that local_err returns its third argument, and this + * macro will too. + */ +#define local_file_err(fh, session, ret, str) \ + local_err((fh)->local, session, ret, "\"%s\": %s", fh->iface.name, str) + +#define VERBOSE(local, ...) \ + do { \ + if ((local)->verbose > 0) \ + fprintf(stderr, __VA_ARGS__); \ + } while (0); +#define SHOW_STRING(s) (((s) == NULL) ? "<null>" : (s)) + +/* + * Some files are created with "marker" prefixes in their name. + * + * When an object is created and the file handle has not been closed, the contents are written into + * a file marked as temporary. When that file handle closes, the temporary file will be renamed to + * its final name, without the marker. At that point the object becomes "visible" to other API + * calls. + * + * Additionally, when an object is created, an empty marker file is created that indicates that the + * file will need to be flushed (transferred to the cloud). That empty marker file is removed when + * the object has been flushed. We already track in memory what objects need to be flushed, but + * having a file representation gives us a record of what needs to be done if we were to crash. + */ +static const char *MARKER_NEED_FLUSH = "FLUSH_"; +static const char *MARKER_TEMPORARY = "TEMP_"; + +/* + * local_config_dup -- + * Make a copy of a configuration string as an allocated C string. + */ +static int +local_config_dup(LOCAL_STORAGE *local, WT_SESSION *session, WT_CONFIG_ITEM *v, const char *suffix, + const char *disallowed, char **result) +{ + size_t len; + int ret; + char *p; + + if (suffix == NULL) + suffix = ""; + len = v->len + strlen(suffix) + 1; + if ((p = malloc(len)) == NULL) + return (local_err(local, session, ENOMEM, "configuration parsing")); + (void)snprintf(p, len, "%.*s", (int)v->len, v->str); + + /* + * Check for illegal characters before adding the suffix, as the suffix may contain such + * characters. + */ + if (disallowed != NULL && strstr(p, disallowed) != NULL) { + ret = local_err(local, session, EINVAL, + "characters \"%s\" disallowed in configuration string \"%s\"", disallowed, p); + free(p); + return (ret); + } + (void)strncat(p, suffix, len); + *result = p; + return (0); +} + +/* + * local_configure + * Parse the configuration for the keys we care about. + */ +static int +local_configure(LOCAL_STORAGE *local, WT_CONFIG_ARG *config) +{ + int ret; + + if ((ret = local_configure_int(local, config, "delay_ms", &local->delay_ms)) != 0) + return (ret); + if ((ret = local_configure_int(local, config, "force_delay", &local->force_delay)) != 0) + return (ret); + if ((ret = local_configure_int(local, config, "force_error", &local->force_error)) != 0) + return (ret); + if ((ret = local_configure_int(local, config, "verbose", &local->verbose)) != 0) + return (ret); + + return (0); +} + +/* + * local_configure_int + * Look for a particular configuration key, and return its integer value. + */ +static int +local_configure_int(LOCAL_STORAGE *local, WT_CONFIG_ARG *config, const char *key, uint32_t *valuep) +{ + WT_CONFIG_ITEM v; + int ret; + + ret = 0; + + if ((ret = local->wt_api->config_get(local->wt_api, NULL, config, key, &v)) == 0) { + if (v.len == 0 || v.type != WT_CONFIG_ITEM_NUM) + ret = local_err(local, NULL, EINVAL, "force_error config arg: integer required"); + else + *valuep = (uint32_t)v.val; + } else if (ret == WT_NOTFOUND) + ret = 0; + else + ret = local_err(local, NULL, EINVAL, "WT_API->config_get"); + + return (ret); +} + +/* + * local_delay -- + * Add any artificial delay or simulated network error during an object transfer. + */ +static int +local_delay(LOCAL_STORAGE *local) +{ + struct timeval tv; + int ret; + + ret = 0; + if (local->force_delay != 0 && local->object_flushes % local->force_delay == 0) { + VERBOSE(local, + "Artificial delay %" PRIu32 " milliseconds after %" PRIu64 " object flushes\n", + local->delay_ms, local->object_flushes); + tv.tv_sec = local->delay_ms / 1000; + tv.tv_usec = (local->delay_ms % 1000) * 1000; + (void)select(0, NULL, NULL, NULL, &tv); + } + if (local->force_error != 0 && local->object_flushes % local->force_error == 0) { + VERBOSE(local, "Artificial error returned after %" PRIu64 " object flushes\n", + local->object_flushes); + ret = ENETUNREACH; + } + + return (ret); +} + +/* + * local_err -- + * Print errors from the interface. Returns "ret", the third argument. + */ +static int +local_err(LOCAL_STORAGE *local, WT_SESSION *session, int ret, const char *format, ...) +{ + va_list ap; + WT_EXTENSION_API *wt_api; + char buf[1000]; + + va_start(ap, format); + wt_api = local->wt_api; + if (vsnprintf(buf, sizeof(buf), format, ap) > (int)sizeof(buf)) + wt_api->err_printf(wt_api, session, "local_storage: error overflow"); + wt_api->err_printf( + wt_api, session, "local_storage: %s: %s", wt_api->strerror(wt_api, session, ret), buf); + va_end(ap); + + return (ret); +} + +/* + * local_flush_free -- + * Free storage for a flush item. + */ +static void +local_flush_free(LOCAL_FLUSH_ITEM *flush) +{ + if (flush != NULL) { + free(flush->bucket); + free(flush->kmsid); + free(flush->marker_path); + free(flush->src_path); + free(flush); + } +} + +/* + * local_location_decode -- + * Break down a location into component parts. + */ +static int +local_location_decode(LOCAL_STORAGE *local, WT_LOCATION_HANDLE *location_handle, char **bucket_name, + char **cluster_prefix, char **kmsid) +{ + LOCAL_LOCATION *location; + char *p; + + location = (LOCAL_LOCATION *)location_handle; + + if (bucket_name != NULL) { + if ((p = strdup(location->bucket)) == NULL) + return (local_err(local, NULL, ENOMEM, "local_location_decode")); + *bucket_name = p; + } + if (cluster_prefix != NULL) { + if ((p = strdup(location->cluster_prefix)) == NULL) + return (local_err(local, NULL, ENOMEM, "local_location_decode")); + *cluster_prefix = p; + } + if (kmsid != NULL) { + if ((p = strdup(location->kmsid)) == NULL) + return (local_err(local, NULL, ENOMEM, "local_location_decode")); + *kmsid = p; + } + + return (0); +} + +/* + * local_location_path -- + * Construct a pathname from the location and local name. + */ +int +local_location_path(LOCAL_STORAGE *local, WT_LOCATION_HANDLE *location_handle, const char *name, + const char *marker, char **pathp) +{ + LOCAL_LOCATION *location; + size_t len; + int ret; + char *p; + + ret = 0; + location = (LOCAL_LOCATION *)location_handle; + + /* If this is a marker file, it will be hidden from all namespaces. */ + if (marker == NULL) + marker = ""; + len = strlen(location->bucket) + strlen(marker) + strlen(location->cluster_prefix) + + strlen(name) + 2; + if ((p = malloc(len)) == NULL) + return (local_err(local, NULL, ENOMEM, "local_location_path")); + snprintf(p, len, "%s/%s%s%s", location->bucket, marker, location->cluster_prefix, name); + *pathp = p; + return (ret); +} + +/* + * local_exist -- + * Return if the file exists. + */ +static int +local_exist(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *name, bool *existp) +{ + struct stat sb; + LOCAL_STORAGE *local; + int ret; + char *path; + + local = (LOCAL_STORAGE *)storage_source; + path = NULL; + + local->op_count++; + if ((ret = local_location_path(local, location_handle, name, NULL, &path)) != 0) + goto err; + + ret = stat(path, &sb); + if (ret == 0) + *existp = true; + else if (errno == ENOENT) { + ret = 0; + *existp = false; + } else + ret = local_err(local, session, errno, "%s: ss_exist stat", path); + +err: + free(path); + return (ret); +} + +/* + * local_flush -- + * Return when the files have been flushed. + */ +static int +local_flush(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *name, const char *config) +{ + LOCAL_STORAGE *local; + LOCAL_FLUSH_ITEM *flush, *safe_flush; + int ret, t_ret; + char *match; + + (void)config; /* Unused */ + + /* + * This implementation does not do anything meaningful on flush. However, we do track which + * objects have not yet been flushed and note which ones need to be flushed now. + */ + ret = 0; + local = (LOCAL_STORAGE *)storage_source; + match = NULL; + + if (location_handle == NULL && name != NULL) + return local_err(local, session, EINVAL, "flush: cannot specify name without location"); + + local->op_count++; + if (location_handle != NULL) { + if ((ret = local_location_path( + local, location_handle, name == NULL ? "" : name, NULL, &match)) != 0) + goto err; + } + VERBOSE(local, "Flush: match=%s\n", SHOW_STRING(match)); + + /* + * Note: we retain the lock on the data structure while flushing all entries. This is fine for + * our local file implementation, when we don't have to do anything to flush, but for a cloud + * implementation, we'll want some way to not hold the lock while transferring data. + */ + if ((ret = pthread_rwlock_wrlock(&local->flush_lock)) != 0) { + (void)local_err(local, session, ret, "flush: pthread_rwlock_wrlock"); + goto err; + } + + TAILQ_FOREACH_SAFE(flush, &local->flushq, q, safe_flush) + { + if (match != NULL) { + /* + * We must match against the bucket and the name if given. + * Our match string is of the form: + * <bucket_name>/<cluster_prefix><name> + * + * If name is given, we must match the entire path. + * If name is not given, we must match up to the beginning + * of the name. + */ + if (name != NULL) { + /* Exact name match required. */ + if (strcmp(flush->src_path, match) != 0) + continue; + } + /* No name specified, everything up to the name must match. */ + else if (strncmp(flush->src_path, match, strlen(match)) != 0) + continue; + } + if ((t_ret = local_flush_one(local, session, flush)) != 0 && ret == 0) + ret = t_ret; + TAILQ_REMOVE(&local->flushq, flush, q); + local_flush_free(flush); + } + + if ((t_ret = pthread_rwlock_unlock(&local->flush_lock)) != 0) { + (void)local_err(local, session, t_ret, "flush: pthread_rwlock_unlock"); + if (ret == 0) + ret = t_ret; + } + +err: + free(match); + + return (ret); +} + +/* + * local_flush_one -- + * Flush one item on the flush queue. + */ +static int +local_flush_one(LOCAL_STORAGE *local, WT_SESSION *session, LOCAL_FLUSH_ITEM *flush) +{ + int ret; + char *object_name; + + ret = 0; + + object_name = strrchr(flush->src_path, '/'); + if (object_name == NULL) + ret = local_err(local, session, errno, "%s: unexpected src path", flush->src_path); + else { + object_name++; + + /* Here's where we would copy the file to a cloud object. */ + VERBOSE(local, "Flush object: from=%s, bucket=%s, object=%s, kmsid=%s, \n", flush->src_path, + flush->bucket, object_name, flush->kmsid); + local->object_flushes++; + + if ((ret = local_delay(local)) != 0) + return (ret); + } + /* When we're done with flushing this file, remove the flush marker file. */ + if (ret == 0 && (ret = unlink(flush->marker_path)) < 0) + ret = local_err( + local, session, errno, "%s: unlink flush marker file failed", flush->marker_path); + + return (ret); +} + +/* + * local_location_handle -- + * Return a location handle from a location string. + */ +static int +local_location_handle(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + const char *location_info, WT_LOCATION_HANDLE **location_handlep) +{ + LOCAL_LOCATION *location; + LOCAL_STORAGE *local; + WT_CONFIG_ITEM value; + WT_CONFIG_PARSER *parser; + WT_EXTENSION_API *wt_api; + int ret, t_ret; + + ret = 0; + location = NULL; + local = (LOCAL_STORAGE *)storage_source; + wt_api = local->wt_api; + parser = NULL; + + local->op_count++; + if ((ret = wt_api->config_parser_open( + wt_api, session, location_info, strlen(location_info), &parser)) != 0) + return (ret); + + if ((location = calloc(1, sizeof(*location))) == NULL) { + ret = ENOMEM; + goto err; + } + + if ((ret = parser->get(parser, "bucket", &value)) != 0) { + if (ret == WT_NOTFOUND) + ret = local_err(local, session, EINVAL, "ss_location_handle: missing bucket parameter"); + goto err; + } + if (value.len == 0) { + ret = + local_err(local, session, EINVAL, "ss_location_handle: bucket_name must be non-empty"); + goto err; + } + if ((ret = local_config_dup(local, session, &value, NULL, NULL, &location->bucket)) != 0) + goto err; + + if ((ret = parser->get(parser, "cluster", &value)) != 0) { + if (ret == WT_NOTFOUND) + ret = + local_err(local, session, EINVAL, "ss_location_handle: missing cluster parameter"); + goto err; + } + if ((ret = local_config_dup(local, session, &value, "_", "_/", &location->cluster_prefix)) != 0) + goto err; + + if ((ret = parser->get(parser, "kmsid", &value)) != 0) { + if (ret == WT_NOTFOUND) + ret = local_err(local, session, EINVAL, "ss_location_handle: missing kmsid parameter"); + goto err; + } + if ((ret = local_config_dup(local, session, &value, NULL, NULL, &location->kmsid)) != 0) + goto err; + + VERBOSE(local, "Location: (bucket=%s,cluster=%s,kmsid=%s)\n", SHOW_STRING(location->bucket), + SHOW_STRING(location->cluster_prefix), SHOW_STRING(location->kmsid)); + + location->iface.close = local_location_handle_close; + *location_handlep = &location->iface; + + if (0) +err: + (void)local_location_handle_close(&location->iface, session); + + if (parser != NULL) + if ((t_ret = parser->close(parser)) != 0 && ret == 0) + ret = t_ret; + + return (ret); +} + +/* + * local_location_handle_close -- + * Free a location handle created by ss_location_handle. + */ +static int +local_location_handle_close(WT_LOCATION_HANDLE *location_handle, WT_SESSION *session) +{ + LOCAL_LOCATION *location; + + (void)session; /* Unused */ + + location = (LOCAL_LOCATION *)location_handle; + free(location->bucket); + free(location->cluster_prefix); + free(location->kmsid); + free(location); + return (0); +} + +/* + * local_location_list -- + * Return a list of object names for the given location. + */ +static int +local_location_list(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *prefix, uint32_t limit, char ***dirlistp, + uint32_t *countp) +{ + ((LOCAL_STORAGE *)storage_source)->op_count++; + return (local_location_list_internal( + storage_source, session, location_handle, NULL, prefix, limit, dirlistp, countp)); +} + +/* + * local_location_list_free -- + * Free memory allocated by local_location_list. + */ +static int +local_location_list_free( + WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, char **dirlist, uint32_t count) +{ + (void)session; + + ((LOCAL_STORAGE *)storage_source)->op_count++; + if (dirlist != NULL) { + while (count > 0) + free(dirlist[--count]); + free(dirlist); + } + return (0); +} + +/* + * local_location_list_internal -- + * Return a list of object names for the given location, matching the given marker if needed. + */ +static int +local_location_list_internal(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *marker, const char *prefix, uint32_t limit, + char ***dirlistp, uint32_t *countp) +{ + struct dirent *dp; + DIR *dirp; + LOCAL_LOCATION *location; + LOCAL_STORAGE *local; + size_t alloc_sz, cluster_len, marker_len, prefix_len; + uint32_t allocated, count; + int ret; + char **entries, **new_entries; + const char *basename; + + local = (LOCAL_STORAGE *)storage_source; + location = (LOCAL_LOCATION *)location_handle; + entries = NULL; + allocated = count = 0; + cluster_len = strlen(location->cluster_prefix); + marker_len = (marker == NULL ? 0 : strlen(marker)); + prefix_len = (prefix == NULL ? 0 : strlen(prefix)); + ret = 0; + + *dirlistp = NULL; + *countp = 0; + + if ((dirp = opendir(location->bucket)) == NULL) { + ret = errno; + if (ret == 0) + ret = EINVAL; + return (local_err(local, session, ret, "%s: ss_location_list: opendir", location->bucket)); + } + + for (count = 0; (dp = readdir(dirp)) != NULL && (limit == 0 || count < limit);) { + /* Skip . and .. */ + basename = dp->d_name; + if (strcmp(basename, ".") == 0 || strcmp(basename, "..") == 0) + continue; + if (marker_len == 0) { + /* Skip over any marker files. */ + if (strncmp(basename, MARKER_TEMPORARY, strlen(MARKER_TEMPORARY)) == 0 || + strncmp(basename, MARKER_NEED_FLUSH, strlen(MARKER_NEED_FLUSH)) == 0) + continue; + } else { + /* Match only the indicated marker files. */ + if (strncmp(basename, marker, marker_len) != 0) + continue; + basename += marker_len; + } + + /* Skip files not associated with our cluster. */ + if (strncmp(basename, location->cluster_prefix, cluster_len) != 0) + continue; + + basename += cluster_len; + /* The list of files is optionally filtered by a prefix. */ + if (prefix != NULL && strncmp(basename, prefix, prefix_len) != 0) + continue; + + if (count >= allocated) { + allocated += 10; + alloc_sz = sizeof(char *) * allocated; + if ((new_entries = realloc(entries, alloc_sz)) == NULL) { + ret = ENOMEM; + goto err; + } + entries = new_entries; + } + if ((entries[count] = strdup(basename)) == NULL) { + ret = ENOMEM; + goto err; + } + count++; + } + + *dirlistp = entries; + *countp = count; + +err: + if (ret == 0) + return (0); + + if (entries != NULL) { + while (count > 0) + free(entries[--count]); + free(entries); + } + return (ret); +} + +/* + * local_open -- + * fopen for our local storage source + */ +static int +local_open(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *name, uint32_t flags, + WT_FILE_HANDLE **file_handlep) +{ + LOCAL_FILE_HANDLE *local_fh; + LOCAL_FLUSH_ITEM *flush; + LOCAL_STORAGE *local; + WT_FILE_HANDLE *file_handle; + int fd, oflags, ret; + char *open_name; + + (void)flags; /* Unused */ + + fd = oflags = ret = 0; + *file_handlep = NULL; + local_fh = NULL; + local = (LOCAL_STORAGE *)storage_source; + + local->op_count++; + if (flags == WT_SS_OPEN_CREATE) + oflags = O_WRONLY | O_CREAT; + else if (flags == WT_SS_OPEN_READONLY) + oflags = O_RDONLY; + else { + ret = local_err(local, session, EINVAL, "open: invalid flags: 0x%x", flags); + goto err; + } + + /* Create a new handle. */ + if ((local_fh = calloc(1, sizeof(LOCAL_FILE_HANDLE))) == NULL) { + ret = ENOMEM; + goto err; + } + if ((ret = local_location_path(local, location_handle, name, NULL, &local_fh->path)) != 0) + goto err; + if (flags == WT_SS_OPEN_CREATE) { + if ((flush = calloc(1, sizeof(LOCAL_FLUSH_ITEM))) == NULL) { + ret = ENOMEM; + goto err; + } + local_fh->flush = flush; + + /* + * Create a marker file that indicates that the file will need to be flushed. + */ + if ((ret = local_location_path( + local, location_handle, name, MARKER_NEED_FLUSH, &flush->marker_path)) != 0) + goto err; + if ((fd = open(flush->marker_path, O_WRONLY | O_CREAT, 0666)) < 0) { + ret = local_err(local, session, errno, "ss_open_object: open: %s", flush->marker_path); + goto err; + } + if (close(fd) < 0) { + ret = local_err(local, session, errno, "ss_open_object: close: %s", flush->marker_path); + goto err; + } + if ((ret = local_location_decode( + local, location_handle, &flush->bucket, NULL, &flush->kmsid)) != 0) + goto err; + + /* + * For the file handle, we will be writing into a file marked as temporary. When the handle + * is closed, we'll move it to its final name. + */ + if ((ret = local_location_path( + local, location_handle, name, MARKER_TEMPORARY, &local_fh->temp_path)) != 0) + goto err; + + open_name = local_fh->temp_path; + } else + open_name = local_fh->path; + + /* Set file mode so it can only be reopened as readonly. */ + if ((fd = open(open_name, oflags, 0444)) < 0) { + ret = local_err(local, session, errno, "ss_open_object: open: %s", open_name); + goto err; + } + local_fh->fd = fd; + local_fh->local = local; + + /* Initialize public information. */ + file_handle = (WT_FILE_HANDLE *)local_fh; + + /* + * Setup the function call table for our custom storage source. Set the function pointer to NULL + * where our implementation doesn't support the functionality. + */ + file_handle->close = local_file_close; + file_handle->fh_advise = NULL; + file_handle->fh_extend = NULL; + file_handle->fh_extend_nolock = NULL; + file_handle->fh_lock = local_file_lock; + file_handle->fh_map = NULL; + file_handle->fh_map_discard = NULL; + file_handle->fh_map_preload = NULL; + file_handle->fh_unmap = NULL; + file_handle->fh_read = local_file_read; + file_handle->fh_size = local_file_size; + file_handle->fh_sync = local_file_sync; + file_handle->fh_sync_nowait = NULL; + file_handle->fh_truncate = NULL; + file_handle->fh_write = local_file_write; + if ((file_handle->name = strdup(name)) == NULL) { + ret = ENOMEM; + goto err; + } + + if ((ret = pthread_rwlock_wrlock(&local->file_handle_lock)) != 0) { + (void)local_err(local, session, ret, "ss_open_object: pthread_rwlock_wrlock"); + goto err; + } + TAILQ_INSERT_HEAD(&local->fileq, local_fh, q); + if ((ret = pthread_rwlock_unlock(&local->file_handle_lock)) != 0) { + (void)local_err(local, session, ret, "ss_open_object: pthread_rwlock_unlock"); + goto err; + } + + *file_handlep = file_handle; + + VERBOSE(local, "File opened: %s final path=%s, temp path=%s\n", SHOW_STRING(name), + SHOW_STRING(local_fh->path), SHOW_STRING(local_fh->temp_path)); + + if (0) { +err: + local_file_close_internal(local, session, local_fh, true); + } + return (ret); +} + +/* + * local_remove -- + * POSIX remove. + */ +static int +local_remove(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *name, uint32_t flags) +{ + LOCAL_STORAGE *local; + int ret; + char *path; + + (void)flags; /* Unused */ + + local = (LOCAL_STORAGE *)storage_source; + path = NULL; + + local->op_count++; + if ((ret = local_location_path(local, location_handle, name, NULL, &path)) != 0) + goto err; + + ret = unlink(path); + if (ret != 0) { + ret = local_err(local, session, errno, "%s: ss_remove unlink", path); + goto err; + } + +err: + free(path); + return (ret); +} + +/* + * local_size -- + * Get the size of a file in bytes, by file name. + */ +static int +local_size(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session, + WT_LOCATION_HANDLE *location_handle, const char *name, wt_off_t *sizep) +{ + struct stat sb; + LOCAL_STORAGE *local; + int ret; + char *path; + + local = (LOCAL_STORAGE *)storage_source; + path = NULL; + + local->op_count++; + if ((ret = local_location_path(local, location_handle, name, NULL, &path)) != 0) + goto err; + + ret = stat(path, &sb); + if (ret == 0) + *sizep = sb.st_size; + else + ret = local_err(local, session, errno, "%s: ss_size stat", path); + +err: + free(path); + return (ret); +} + +/* + * local_terminate -- + * Discard any resources on termination + */ +static int +local_terminate(WT_STORAGE_SOURCE *storage, WT_SESSION *session) +{ + LOCAL_FILE_HANDLE *local_fh, *safe_fh; + LOCAL_STORAGE *local; + int ret; + + ret = 0; + local = (LOCAL_STORAGE *)storage; + + local->op_count++; + + /* + * We should be single threaded at this point, so it is safe to destroy the lock and access the + * file handle list without it. + */ + if ((ret = pthread_rwlock_destroy(&local->file_handle_lock)) != 0) + (void)local_err(local, session, ret, "terminate: pthread_rwlock_destroy"); + + TAILQ_FOREACH_SAFE(local_fh, &local->fileq, q, safe_fh) + local_file_close_internal(local, session, local_fh, true); + + free(local); + return (ret); +} + +/* + * local_file_close -- + * ANSI C close. + */ +static int +local_file_close(WT_FILE_HANDLE *file_handle, WT_SESSION *session) +{ + LOCAL_STORAGE *local; + LOCAL_FILE_HANDLE *local_fh; + LOCAL_FLUSH_ITEM *flush; + int ret, t_ret; + + ret = 0; + local_fh = (LOCAL_FILE_HANDLE *)file_handle; + local = local_fh->local; + + local->fh_ops++; + if ((ret = pthread_rwlock_wrlock(&local->file_handle_lock)) != 0) + /* There really isn't anything more we can do. It will get cleaned up on terminate. */ + return (local_err(local, session, ret, "file handle close: pthread_rwlock_wrlock")); + + TAILQ_REMOVE(&local->fileq, local_fh, q); + + if ((ret = pthread_rwlock_unlock(&local->file_handle_lock)) != 0) + (void)local_err(local, session, ret, "file handle close: pthread_rwlock_unlock"); + + /* + * If we need to track flushes for this file, save the flush item on our queue. + */ + if (ret == 0 && ((flush = local_fh->flush)) != NULL) { + if ((ret = pthread_rwlock_wrlock(&local->flush_lock)) != 0) + (void)local_err(local, session, ret, "file handle close: pthread_rwlock_wrlock2"); + + if (ret == 0) { + /* + * Move the flush object from the file handle and to the flush queue. It is now owned by + * the flush queue and will be freed when that item is flushed. + */ + TAILQ_INSERT_HEAD(&local->flushq, flush, q); + local_fh->flush = NULL; + + if ((ret = pthread_rwlock_unlock(&local->flush_lock)) != 0) + (void)local_err(local, session, ret, "file handle close: pthread_rwlock_unlock2"); + if (ret == 0 && ((flush->src_path = strdup(local_fh->path)) == NULL)) + ret = ENOMEM; + } + } + + if ((t_ret = local_file_close_internal(local, session, local_fh, false)) != 0) { + if (ret == 0) + ret = t_ret; + } + + return (ret); +} + +/* + * local_file_close_internal -- + * Internal file handle close. + */ +static int +local_file_close_internal( + LOCAL_STORAGE *local, WT_SESSION *session, LOCAL_FILE_HANDLE *local_fh, bool final) +{ + int ret; + + ret = 0; + if ((close(local_fh->fd)) < 0) + ret = local_err(local, session, errno, "WT_FILE_HANDLE->close: close"); + + /* + * If this is a normal close (not a termination cleanup), and this handle creates an object, + * move the temp file to its final position. + */ + if (!final && ret == 0 && local_fh->temp_path != NULL) { + if ((ret = rename(local_fh->temp_path, local_fh->path)) < 0) + ret = local_err(local, session, errno, "FILE_HANDLE->close: rename"); + } + + local_flush_free(local_fh->flush); + free(local_fh->temp_path); + free(local_fh->path); + free(local_fh->iface.name); + free(local_fh); + + return (ret); +} + +/* + * local_file_lock -- + * Lock/unlock a file. + */ +static int +local_file_lock(WT_FILE_HANDLE *file_handle, WT_SESSION *session, bool lock) +{ + /* Locks are always granted. */ + + (void)session; /* Unused */ + (void)lock; /* Unused */ + + ((LOCAL_FILE_HANDLE *)file_handle)->local->fh_ops++; + return (0); +} + +/* + * local_file_read -- + * POSIX pread. + */ +static int +local_file_read( + WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t offset, size_t len, void *buf) +{ + LOCAL_FILE_HANDLE *local_fh; + ssize_t nbytes; + int ret; + uint8_t *addr; + + local_fh = (LOCAL_FILE_HANDLE *)file_handle; + ret = 0; + + local_fh->local->read_ops++; + for (addr = buf; ret == 0 && len > 0;) { + nbytes = pread(local_fh->fd, addr, len, offset); + if (nbytes < 0) + ret = local_file_err(local_fh, session, errno, "pread"); + else { + addr += nbytes; + len -= (size_t)nbytes; + offset += nbytes; + } + } + return (ret); +} + +/* + * local_file_size -- + * Get the size of a file in bytes, by file handle. + */ +static int +local_file_size(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t *sizep) +{ + struct stat sb; + LOCAL_FILE_HANDLE *local_fh; + int ret; + + local_fh = (LOCAL_FILE_HANDLE *)file_handle; + + local_fh->local->fh_ops++; + ret = fstat(local_fh->fd, &sb); + if (ret == 0) + *sizep = sb.st_size; + else + ret = local_file_err(local_fh, session, ret, "fh_size fstat"); + + return (ret); +} + +/* + * local_file_sync -- + * Ensure the content of the local file is stable. + */ +static int +local_file_sync(WT_FILE_HANDLE *file_handle, WT_SESSION *session) +{ + LOCAL_FILE_HANDLE *local_fh; + int ret; + + local_fh = (LOCAL_FILE_HANDLE *)file_handle; + + local_fh->local->fh_ops++; + if ((ret = fsync(local_fh->fd)) < 0) + ret = local_file_err(local_fh, session, errno, "fsync"); + + return (ret); +} + +/* + * local_file_write -- + * POSIX pwrite. + */ +static int +local_file_write( + WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t offset, size_t len, const void *buf) +{ + LOCAL_FILE_HANDLE *local_fh; + ssize_t nbytes; + int ret; + const uint8_t *addr; + + local_fh = (LOCAL_FILE_HANDLE *)file_handle; + ret = 0; + + local_fh->local->write_ops++; + for (addr = buf; ret == 0 && len > 0;) { + nbytes = pwrite(local_fh->fd, addr, len, offset); + if (nbytes < 0) + ret = local_file_err(local_fh, session, errno, "pwrite"); + else { + addr += nbytes; + len -= (size_t)nbytes; + offset += nbytes; + } + } + return (ret); +} + +/* + * wiredtiger_extension_init -- + * A simple shared library encryption example. + */ +int +wiredtiger_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +{ + LOCAL_STORAGE *local; + int ret; + + if ((local = calloc(1, sizeof(LOCAL_STORAGE))) == NULL) + return (errno); + local->wt_api = connection->get_extension_api(connection); + if ((ret = pthread_rwlock_init(&local->file_handle_lock, NULL)) != 0 || + (ret = pthread_rwlock_init(&local->flush_lock, NULL)) != 0) { + (void)local_err(local, NULL, ret, "pthread_rwlock_init"); + free(local); + } + + /* + * Allocate a local storage structure, with a WT_STORAGE structure as the first field, allowing + * us to treat references to either type of structure as a reference to the other type. + */ + local->storage_source.ss_exist = local_exist; + local->storage_source.ss_flush = local_flush; + local->storage_source.ss_location_handle = local_location_handle; + local->storage_source.ss_location_list = local_location_list; + local->storage_source.ss_location_list_free = local_location_list_free; + local->storage_source.ss_open_object = local_open; + local->storage_source.ss_remove = local_remove; + local->storage_source.ss_size = local_size; + local->storage_source.terminate = local_terminate; + + if ((ret = local_configure(local, config)) != 0) { + free(local); + return (ret); + } + + /* Load the storage */ + if ((ret = connection->add_storage_source( + connection, "local_store", &local->storage_source, NULL)) != 0) { + (void)local_err(local, NULL, ret, "WT_CONNECTION->add_storage_source"); + free(local); + } + return (ret); +} diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 2e87bc67d18..35c3678fdf9 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-4.4", - "commit": "68bc106b4aff8fe0e4fa4ba4707e363be3934ceb" + "commit": "8d6061cfc8910dd0e591d8025e8b2649c063e71e" } diff --git a/src/third_party/wiredtiger/lang/python/wiredtiger.i b/src/third_party/wiredtiger/lang/python/wiredtiger.i index 6f13f8f2183..829f58d6ac3 100644 --- a/src/third_party/wiredtiger/lang/python/wiredtiger.i +++ b/src/third_party/wiredtiger/lang/python/wiredtiger.i @@ -55,6 +55,15 @@ from packing import pack, unpack ## @endcond %} +/* + * For some reason, SWIG doesn't import some types from stdint.h. We need to tell SWIG something + * about those type we use. We don't need to be exact in our typing here, SWIG just needs hints + * so it knows what Python types to map to. + */ +%inline %{ + typedef unsigned int uint32_t; +%} + /* Set the input argument to point to a temporary variable */ %typemap(in, numinputs=0) WT_CONNECTION ** (WT_CONNECTION *temp = NULL) { $1 = &temp; @@ -65,6 +74,21 @@ from packing import pack, unpack %typemap(in, numinputs=0) WT_CURSOR ** (WT_CURSOR *temp = NULL) { $1 = &temp; } +%typemap(in, numinputs=0) WT_FILE_HANDLE ** (WT_FILE_HANDLE *temp = NULL) { + $1 = &temp; + } +%typemap(in, numinputs=0) WT_LOCATION_HANDLE ** (WT_LOCATION_HANDLE *temp = NULL) { + $1 = &temp; + } +%typemap(in, numinputs=0) WT_STORAGE_SOURCE ** (WT_STORAGE_SOURCE *temp = NULL) { + $1 = &temp; + } +%typemap(in, numinputs=0) bool * (bool temp = false) { + $1 = &temp; + } +%typemap(in, numinputs=0) wt_off_t * (wt_off_t temp = false) { + $1 = &temp; +} %typemap(in, numinputs=0) WT_EVENT_HANDLER * %{ $1 = &pyApiEventHandler; @@ -166,6 +190,51 @@ from packing import pack, unpack $1 = &val; } +%typemap(in,numinputs=0) (char ***object_list, int *countp) (char **list, uint32_t nentries) { + $1 = &list; + $2 = &nentries; +} + +%typemap(argout) (char ***object_list, int *countp) { + int i; + char **list; + + $result = PyList_New(*$2); + list = (*$1); + /* + * When we're done with the individual C strings, free them. + * In theory, we should call the ss_location_list_free() method, + * but that's awkward, since we don't have the storage_source and session. + */ + for (i = 0; i < *$2; i++) { + PyObject *o = PyString_InternFromString(list[i]); + PyList_SetItem($result, i, o); + free(list[i]); + } + free(list); +} + + +%typemap(argout) WT_FILE_HANDLE ** { + $result = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), SWIGTYPE_p___wt_file_handle, 0); +} + +%typemap(argout) WT_LOCATION_HANDLE ** { + $result = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), SWIGTYPE_p___wt_location_handle, 0); +} + +%typemap(argout) WT_STORAGE_SOURCE ** { + $result = SWIG_NewPointerObj(SWIG_as_voidptr(*$1), SWIGTYPE_p___wt_storage_source, 0); +} + +%typemap(argout) bool * { + $result = PyBool_FromLong(*$1); +} + +%typemap(argout) wt_off_t * { + $result = PyInt_FromLong(*$1); +} + %typemap(freearg) (WT_MODIFY *, int *nentriesp) { __wt_free(NULL, $1); } @@ -269,7 +338,10 @@ from packing import pack, unpack %enddef DESTRUCTOR(__wt_connection, close) DESTRUCTOR(__wt_cursor, close) +DESTRUCTOR(__wt_file_handle, close) DESTRUCTOR(__wt_session, close) +DESTRUCTOR(__wt_storage_source, close) +DESTRUCTOR(__wt_location_handle, close) /* * OVERRIDE_METHOD must be used when overriding or extending an existing @@ -445,6 +517,9 @@ def wiredtiger_calc_modify_string(session, oldv, newv, maxdiff, nmod): SELFHELPER(struct __wt_connection, connection) SELFHELPER(struct __wt_session, session) SELFHELPER(struct __wt_cursor, cursor) +SELFHELPER(struct __wt_file_handle, file_handle) +SELFHELPER(struct __wt_location_handle, location_handle) +SELFHELPER(struct __wt_storage_source, storage_source) /* * Create an error exception if it has not already @@ -564,6 +639,24 @@ OVERRIDE_METHOD(__wt_cursor, WT_CURSOR, search_near, (self)) $result = PyBytes_FromStringAndSize(*$1, *$2); } +/* Handle binary data input from FILE_HANDLE->fh_write. */ +%typemap(in,numinputs=1) (size_t length, const void *buf) (Py_ssize_t length, const void *buf = NULL) { + if (PyBytes_AsStringAndSize($input, &buf, &length) < 0) + SWIG_exception_fail(SWIG_AttributeError, + "bad bytes input argument"); + $1 = length; + $2 = buf; +} + +/* Handle binary data input from FILE_HANDLE->fh_read. */ +%typemap(in,numinputs=1) (size_t length, void *buf) (Py_ssize_t length, const void *buf = NULL) { + if (PyBytes_AsStringAndSize($input, &buf, &length) < 0) + SWIG_exception_fail(SWIG_AttributeError, + "bad bytes input argument"); + $1 = length; + $2 = buf; +} + /* Handle record number returns from get_recno */ %typemap(in,numinputs=0) (uint64_t *recnop) (uint64_t recno) { $1 = &recno; } %typemap(frearg) (uint64_t *recnop) ""; @@ -874,6 +967,146 @@ typedef int int_void; } }; +%define CONCAT(a,b) a##b +%enddef + + /* + * SIDESTEP_METHOD is a workaround. We don't yet have some methods exposed in + * a way that makes them callable. For some reason, this workaround works, + * even though it's awkward. + */ +%define SIDESTEP_METHOD(cclass, method, cargs, cargs_call) +%ignore cclass::method; +%rename (method) cclass::CONCAT(_,method); +%extend cclass { + int CONCAT(_, method) cargs { + return (self->method cargs_call ); + } +}; +%enddef + +SIDESTEP_METHOD(__wt_storage_source, ss_location_handle, + (WT_SESSION *session, const char *config, WT_LOCATION_HANDLE **handle), + (self, session, config, handle)) + +SIDESTEP_METHOD(__wt_location_handle, close, + (WT_SESSION *session), + (self, session)) + +SIDESTEP_METHOD(__wt_storage_source, ss_exist, + (WT_SESSION *session, WT_LOCATION_HANDLE *location_handle, + const char *name, bool *existp), + (self, session, location_handle, name, existp)) + +SIDESTEP_METHOD(__wt_storage_source, ss_flush, + (WT_SESSION *session, WT_LOCATION_HANDLE *location_handle, + const char *name, const char *config), + (self, session, location_handle, name, config)) + +SIDESTEP_METHOD(__wt_storage_source, ss_open_object, + (WT_SESSION *session, WT_LOCATION_HANDLE *location_handle, + const char *name, uint32_t flags, WT_FILE_HANDLE **file_handlep), + (self, session, location_handle, name, flags, file_handlep)) + +SIDESTEP_METHOD(__wt_storage_source, ss_remove, + (WT_SESSION *session, WT_LOCATION_HANDLE *location_handle, + const char *name, uint32_t flags), + (self, session, location_handle, name, flags)) + +SIDESTEP_METHOD(__wt_storage_source, ss_size, + (WT_SESSION *session, WT_LOCATION_HANDLE *location_handle, + const char *name, wt_off_t *sizep), + (self, session, location_handle, name, sizep)) + +SIDESTEP_METHOD(__wt_storage_source, terminate, + (WT_SESSION *session), + (self, session)) + +SIDESTEP_METHOD(__wt_file_handle, close, + (WT_SESSION *session), + (self, session)) + +SIDESTEP_METHOD(__wt_file_handle, fh_advise, + (WT_SESSION *session, wt_off_t offset, wt_off_t len, int advice), + (self, session, offset, len, advice)) + +SIDESTEP_METHOD(__wt_file_handle, fh_extend, + (WT_SESSION *session, wt_off_t offset), + (self, session, offset)) + +SIDESTEP_METHOD(__wt_file_handle, fh_extend_nolock, + (WT_SESSION *session, wt_off_t offset), + (self, session, offset)) + +SIDESTEP_METHOD(__wt_file_handle, fh_lock, + (WT_SESSION *session, bool lock), + (self, session, lock)) + +SIDESTEP_METHOD(__wt_file_handle, fh_map, + (WT_SESSION *session, bool lock, void *mapped_regionp, size_t *lengthp, void *mapped_cookiep), + (self, session, mapped_regionp, lengthp, mapped_cookiep)) + +SIDESTEP_METHOD(__wt_file_handle, fh_map_discard, + (WT_SESSION *session, void *map, size_t length, void *mapped_cookie), + (self, session, map, length, mapped_cookie)) + +SIDESTEP_METHOD(__wt_file_handle, fh_map_preload, + (WT_SESSION *session, const void *map, size_t length, void *mapped_cookie), + (self, session, map, length, mapped_cookie)) + +SIDESTEP_METHOD(__wt_file_handle, fh_unmap, + (WT_SESSION *session, void *mapped_region, size_t length, void *mapped_cookie), + (self, session, mapped_region, length, mapped_cookie)) + + /* +SIDESTEP_METHOD(__wt_file_handle, fh_read, + (WT_SESSION *session, wt_off_t offset, size_t len, void *buf), + (self, session, offset, len, buf)) + */ + +SIDESTEP_METHOD(__wt_file_handle, fh_size, + (WT_SESSION *session, wt_off_t *sizep), + (self, session, sizep)) + +SIDESTEP_METHOD(__wt_file_handle, fh_sync, + (WT_SESSION *session), + (self, session)) + +SIDESTEP_METHOD(__wt_file_handle, fh_sync_nowait, + (WT_SESSION *session), + (self, session)) + +SIDESTEP_METHOD(__wt_file_handle, fh_truncate, + (WT_SESSION *session, wt_off_t offset), + (self, session, offset)) + +SIDESTEP_METHOD(__wt_file_handle, fh_write, + (WT_SESSION *session, unsigned long offset, size_t length, const void *buf), + (self, session, offset, length, buf)) + +%ignore __wt_file_handle::fh_read; +%rename (fh_read) __wt_file_handle::_fh_read; +%extend __wt_file_handle { + int _fh_read(WT_SESSION *session, unsigned long offset, size_t length, void *buf) { + return (self->fh_read(self, session, offset, length, buf)); + } +}; + +%ignore __wt_storage_source::ss_location_list; +%rename (ss_location_list) __wt_storage_source::_ss_location_list; +%extend __wt_storage_source { + int _ss_location_list(WT_SESSION *session, WT_LOCATION_HANDLE *handle, const char *prefix, + uint32_t limit, char ***object_list, int *countp) { + return (self->ss_location_list(self, session, handle, prefix, limit, object_list, countp)); + } +}; + +/* + * No need for a location_list_free method, as the list and its components + * are freed immediately after the location_list call. + */ +%ignore __wt_storage_source::ss_location_list_free; + %{ int diagnostic_build() { #ifdef HAVE_DIAGNOSTIC @@ -929,6 +1162,9 @@ OVERRIDE_METHOD(__wt_session, WT_SESSION, log_printf, (self, msg)) %rename(Modify) __wt_modify; %rename(Session) __wt_session; %rename(Connection) __wt_connection; +%rename(FileHandle) __wt_file_handle; +%rename(StorageSource) __wt_storage_source; +%rename(LocationHandle) __wt_location_handle; %include "wiredtiger.h" @@ -1205,5 +1441,7 @@ def _rename_with_prefix(prefix, toclass): _rename_with_prefix('WT_STAT_CONN_', stat.conn) _rename_with_prefix('WT_STAT_DSRC_', stat.dsrc) _rename_with_prefix('WT_STAT_SESSION_', stat.session) +_rename_with_prefix('WT_SS_', StorageSource) +_rename_with_prefix('WT_FILE_HANDLE_', FileHandle) del _rename_with_prefix %} diff --git a/src/third_party/wiredtiger/src/config/test_config.c b/src/third_party/wiredtiger/src/config/test_config.c index 524f61e24e1..8012ea026c7 100644 --- a/src/third_party/wiredtiger/src/config/test_config.c +++ b/src/third_party/wiredtiger/src/config/test_config.c @@ -2,24 +2,34 @@ #include "wt_internal.h" +static const WT_CONFIG_CHECK confchk_stat_cache_size_subconfigs[] = { + {"enabled", "boolean", NULL, NULL, NULL, 0}, {"limit", "string", NULL, NULL, NULL, 0}, + {NULL, NULL, NULL, NULL, NULL, 0}}; + static const WT_CONFIG_CHECK confchk_poc_test[] = { + {"cache_size_mb", "int", NULL, "min=0,max=100000000000", NULL, 0}, {"collection_count", "int", NULL, "min=0,max=200000", NULL, 0}, {"duration_seconds", "int", NULL, "min=0,max=1000000", NULL, 0}, + {"enable_tracking", "boolean", NULL, NULL, NULL, 0}, {"insert_config", "string", NULL, NULL, NULL, 0}, {"insert_threads", "int", NULL, "min=0,max=20", NULL, 0}, {"key_count", "int", NULL, "min=0,max=1000000", NULL, 0}, {"key_size", "int", NULL, "min=0,max=10000", NULL, 0}, + {"rate_per_second", "int", NULL, "min=1,max=1000", NULL, 0}, {"read_threads", "int", NULL, "min=0,max=100", NULL, 0}, + {"stat_cache_size", "category", NULL, NULL, confchk_stat_cache_size_subconfigs, 2}, {"update_config", "string", NULL, NULL, NULL, 0}, {"update_threads", "int", NULL, "min=0,max=20", NULL, 0}, - {"value_size", "int", NULL, "min=0,max=10000", NULL, 0}, {NULL, NULL, NULL, NULL, NULL, 0}}; + {"value_size", "int", NULL, "min=0,max=1000000000", NULL, 0}, {NULL, NULL, NULL, NULL, NULL, 0}}; static const WT_CONFIG_ENTRY config_entries[] = { {"poc_test", - "collection_count=1,duration_seconds=0,insert_config=," - "insert_threads=0,key_count=0,key_size=0,read_threads=0," - "update_config=,update_threads=0,value_size=0", - confchk_poc_test, 10}, + "cache_size_mb=0,collection_count=1,duration_seconds=0," + "enable_tracking=true,insert_config=,insert_threads=0,key_count=0" + ",key_size=0,rate_per_second=1,read_threads=0," + "stat_cache_size=(enabled=false,limit=),update_config=," + "update_threads=0,value_size=0", + confchk_poc_test, 14}, {NULL, NULL, NULL, 0}}; /* diff --git a/src/third_party/wiredtiger/src/docs/cursor-join.dox b/src/third_party/wiredtiger/src/docs/cursor-join.dox index eeceedc2ec2..a3380f17709 100644 --- a/src/third_party/wiredtiger/src/docs/cursor-join.dox +++ b/src/third_party/wiredtiger/src/docs/cursor-join.dox @@ -41,4 +41,9 @@ joins', a join cursor will return duplicates. A join cursor never returns duplicates unless \c "operation=or" is used in a join configuration, or unless the first joined cursor is itself a join cursor that would return duplicates. +Another example of using a join cursor is provided in @ex_ref{ex_col_store.c}. +Here the columns hour and temp are joined together to find the maximum and minimum +temperature for a given time period. + +@snippet ex_col_store.c col-store join */ diff --git a/src/third_party/wiredtiger/src/docs/examples.dox b/src/third_party/wiredtiger/src/docs/examples.dox index 7d80894014e..f3834101713 100644 --- a/src/third_party/wiredtiger/src/docs/examples.dox +++ b/src/third_party/wiredtiger/src/docs/examples.dox @@ -52,4 +52,7 @@ Shows how to access a database with multiple threads. @example ex_file_system.c Shows how to extend WiredTiger with a custom file-system implementation. +@example ex_col_store.c +Shows how to use a column-store with column groups using example operations. + */ diff --git a/src/third_party/wiredtiger/src/docs/schema.dox b/src/third_party/wiredtiger/src/docs/schema.dox index 6fe202462ed..872fdf395b8 100644 --- a/src/third_party/wiredtiger/src/docs/schema.dox +++ b/src/third_party/wiredtiger/src/docs/schema.dox @@ -262,6 +262,21 @@ group. Because column groups always have the same key as the table, key columns for column groups are retrieved using WT_CURSOR::get_key, not WT_CURSOR::get_value. +Another example of using column groups is in @ex_ref{ex_col_store.c}: + +@snippet ex_col_store.c col-store decl +@snippet ex_col_store.c col-store create columns + +In this example the hour and day columns are grouped together in one +columns group and the temperature column stored in another. This allows +a cursor to be opened on a either of these column groups instead of the +entire table. + +@snippet ex_col_store.c col-store temperature + +An operation can then be completed on the values of this column without +having to bring the other columns into memory. + @section schema_indices Indices Columns are also used to create and configure indices on tables. @@ -357,11 +372,16 @@ on a column-store where the index key is the record number). The code included above was taken from the complete example program @ex_ref{ex_schema.c}. -Here is another example program, @ex_ref{ex_call_center.c}. +Here are other example programs, @ex_ref{ex_call_center.c}, @snippet ex_call_center.c call-center decl @snippet ex_call_center.c call-center work +and @ex_ref{ex_col_store.c}. + +@snippet ex_col_store.c col-store decl +@snippet ex_col_store.c col-store main + @section logged_non_logged_tables Logged and Non-logged tables - WiredTiger writes records to the log for each transaction when logging is enabled diff --git a/src/third_party/wiredtiger/src/docs/spell.ok b/src/third_party/wiredtiger/src/docs/spell.ok index f6964b7b4d2..8a19d8a5d27 100644 --- a/src/third_party/wiredtiger/src/docs/spell.ok +++ b/src/third_party/wiredtiger/src/docs/spell.ok @@ -638,3 +638,4 @@ yieldcpu zlib zseries zstd +loc diff --git a/src/third_party/wiredtiger/src/history/hs_rec.c b/src/third_party/wiredtiger/src/history/hs_rec.c index 6f523d49089..9ca2c32e07f 100644 --- a/src/third_party/wiredtiger/src/history/hs_rec.c +++ b/src/third_party/wiredtiger/src/history/hs_rec.c @@ -228,10 +228,12 @@ __hs_next_upd_full_value(WT_SESSION_IMPL *session, WT_MODIFY_VECTOR *modifies, /* * __wt_hs_insert_updates -- - * Copy one set of saved updates into the database's history store table. + * Copy one set of saved updates into the database's history store table. Whether the function + * fails or succeeds, if there is a successful write to history, cache_write_hs is set to true. */ int -__wt_hs_insert_updates(WT_SESSION_IMPL *session, WT_PAGE *page, WT_MULTI *multi) +__wt_hs_insert_updates( + WT_SESSION_IMPL *session, WT_PAGE *page, WT_MULTI *multi, bool *cache_write_hs) { WT_BTREE *btree, *hs_btree; WT_CURSOR *hs_cursor; @@ -258,6 +260,7 @@ __wt_hs_insert_updates(WT_SESSION_IMPL *session, WT_PAGE *page, WT_MULTI *multi) char ts_string[3][WT_TS_INT_STRING_SIZE]; bool enable_reverse_modify, hs_inserted, squashed, ts_updates_in_hs; + *cache_write_hs = false; btree = S2BT(session); prev_upd = NULL; insert_cnt = 0; @@ -633,6 +636,10 @@ err: if (ret == 0 && insert_cnt > 0) __hs_verbose_cache_stats(session, btree); + /* cache_write_hs is set to true as there was at least one successful write to history. */ + if (insert_cnt > 0) + *cache_write_hs = true; + __wt_scr_free(session, &key); /* modify_value is allocated in __wt_modify_pack. Free it if it is allocated. */ if (modify_value != NULL) diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 7fac6f5cbd4..ee4b04dbb52 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -764,8 +764,8 @@ extern int __wt_hs_find_upd(WT_SESSION_IMPL *session, uint32_t btree_id, WT_ITEM WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_hs_get_btree(WT_SESSION_IMPL *session, WT_BTREE **hs_btreep) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); -extern int __wt_hs_insert_updates(WT_SESSION_IMPL *session, WT_PAGE *page, WT_MULTI *multi) - WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_hs_insert_updates(WT_SESSION_IMPL *session, WT_PAGE *page, WT_MULTI *multi, + bool *cache_write_hs) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_hs_modify(WT_CURSOR_BTREE *hs_cbt, WT_UPDATE *hs_upd) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_hs_open(WT_SESSION_IMPL *session, const char **cfg) @@ -1795,13 +1795,11 @@ extern void __wt_timestamp_to_hex_string(wt_timestamp_t ts, char *hex_timestamp) extern void __wt_txn_bump_snapshot(WT_SESSION_IMPL *session); extern void __wt_txn_clear_durable_timestamp(WT_SESSION_IMPL *session); extern void __wt_txn_clear_read_timestamp(WT_SESSION_IMPL *session); -extern void __wt_txn_clear_timestamp_queues(WT_SESSION_IMPL *session); extern void __wt_txn_destroy(WT_SESSION_IMPL *session); extern void __wt_txn_get_snapshot(WT_SESSION_IMPL *session); extern void __wt_txn_global_destroy(WT_SESSION_IMPL *session); extern void __wt_txn_op_free(WT_SESSION_IMPL *session, WT_TXN_OP *op); extern void __wt_txn_publish_durable_timestamp(WT_SESSION_IMPL *session); -extern void __wt_txn_publish_read_timestamp(WT_SESSION_IMPL *session); extern void __wt_txn_release(WT_SESSION_IMPL *session); extern void __wt_txn_release_resources(WT_SESSION_IMPL *session); extern void __wt_txn_release_snapshot(WT_SESSION_IMPL *session); diff --git a/src/third_party/wiredtiger/src/include/stat.h b/src/third_party/wiredtiger/src/include/stat.h index 9131489d0fc..28e852d2b30 100644 --- a/src/third_party/wiredtiger/src/include/stat.h +++ b/src/third_party/wiredtiger/src/include/stat.h @@ -625,21 +625,11 @@ struct __wt_connection_stats { int64_t page_del_rollback_blocked; int64_t child_modify_blocked_page; int64_t txn_prepared_updates_count; - int64_t txn_durable_queue_walked; - int64_t txn_durable_queue_empty; - int64_t txn_durable_queue_head; - int64_t txn_durable_queue_inserts; - int64_t txn_durable_queue_len; int64_t txn_prepare; int64_t txn_prepare_commit; int64_t txn_prepare_active; int64_t txn_prepare_rollback; int64_t txn_query_ts; - int64_t txn_read_queue_walked; - int64_t txn_read_queue_empty; - int64_t txn_read_queue_head; - int64_t txn_read_queue_inserts; - int64_t txn_read_queue_len; int64_t txn_rts; int64_t txn_rts_pages_visited; int64_t txn_rts_tree_walk_skip_pages; @@ -685,6 +675,7 @@ struct __wt_connection_stats { int64_t txn_pinned_timestamp_oldest; int64_t txn_timestamp_oldest_active_read; int64_t txn_sync; + int64_t txn_walk_sessions; int64_t txn_commit; int64_t txn_rollback; int64_t lsm_checkpoint_throttle; diff --git a/src/third_party/wiredtiger/src/include/txn.h b/src/third_party/wiredtiger/src/include/txn.h index de72ec0c64c..23ed483a3fe 100644 --- a/src/third_party/wiredtiger/src/include/txn.h +++ b/src/third_party/wiredtiger/src/include/txn.h @@ -104,20 +104,10 @@ struct __wt_txn_shared { */ wt_timestamp_t read_timestamp; - TAILQ_ENTRY(__wt_txn_shared) read_timestampq; - TAILQ_ENTRY(__wt_txn_shared) durable_timestampq; - /* Set if need to clear from the durable queue */ - volatile uint8_t is_allocating; - uint8_t clear_durable_q; - uint8_t clear_read_q; /* Set if need to clear from the read queue */ - WT_CACHE_LINE_PAD_END }; -TAILQ_HEAD(__wt_txn_dts_qh, __wt_txn_shared); -TAILQ_HEAD(__wt_txn_rts_qh, __wt_txn_shared); - struct __wt_txn_global { volatile uint64_t current; /* Current transaction ID. */ @@ -151,16 +141,6 @@ struct __wt_txn_global { /* Protects logging, checkpoints and transaction visibility. */ WT_RWLOCK visibility_rwlock; - /* List of transactions sorted by durable timestamp. */ - WT_RWLOCK durable_timestamp_rwlock; - struct __wt_txn_dts_qh durable_timestamph; - uint32_t durable_timestampq_len; - - /* List of transactions sorted by read timestamp. */ - WT_RWLOCK read_timestamp_rwlock; - struct __wt_txn_rts_qh read_timestamph; - uint32_t read_timestampq_len; - /* * Track information about the running checkpoint. The transaction snapshot used when * checkpointing are special. Checkpoints can run for a long time so we keep them out of regular diff --git a/src/third_party/wiredtiger/src/include/wiredtiger.in b/src/third_party/wiredtiger/src/include/wiredtiger.in index 7878645d75e..b3a0286afa2 100644 --- a/src/third_party/wiredtiger/src/include/wiredtiger.in +++ b/src/third_party/wiredtiger/src/include/wiredtiger.in @@ -78,12 +78,10 @@ struct __wt_file_system; typedef struct __wt_file_system WT_FILE_SYSTEM; struct __wt_item; typedef struct __wt_item WT_ITEM; struct __wt_modify; typedef struct __wt_modify WT_MODIFY; struct __wt_session; typedef struct __wt_session WT_SESSION; -#if !defined(SWIG) #if !defined(DOXYGEN) struct __wt_storage_source; typedef struct __wt_storage_source WT_STORAGE_SOURCE; struct __wt_location_handle; typedef struct __wt_location_handle WT_LOCATION_HANDLE; #endif -#endif #if defined(SWIGJAVA) #define WT_HANDLE_NULLABLE(typename) typename##_NULLABLE @@ -2566,8 +2564,8 @@ struct __wt_connection { int __F(set_file_system)( WT_CONNECTION *connection, WT_FILE_SYSTEM *fs, const char *config); -#if !defined(SWIG) #if !defined(DOXYGEN) +#if !defined(SWIG) /*! * Add a storage source implementation. * @@ -2584,6 +2582,7 @@ struct __wt_connection { */ int __F(add_storage_source)(WT_CONNECTION *connection, const char *name, WT_STORAGE_SOURCE *storage_source, const char *config); +#endif /*! * Get a storage source implementation. @@ -2600,7 +2599,6 @@ struct __wt_connection { int __F(get_storage_source)(WT_CONNECTION *connection, const char *name, WT_STORAGE_SOURCE **storage_sourcep); #endif -#endif /*! * Return a reference to the WiredTiger extension functions. @@ -4425,6 +4423,7 @@ struct __wt_file_system { */ int (*terminate)(WT_FILE_SYSTEM *file_system, WT_SESSION *session); }; +#endif /* !defined(SWIG) */ /*! WT_FILE_HANDLE::fadvise flags: no longer need */ #define WT_FILE_HANDLE_DONTNEED 1 @@ -4711,9 +4710,7 @@ struct __wt_file_handle { int (*fh_write)(WT_FILE_HANDLE *file_handle, WT_SESSION *session, wt_off_t offset, size_t length, const void *buf); }; -#endif /* !defined(SWIG) */ -#if !defined(SWIG) #if !defined(DOXYGEN) /* This interface is not yet public. */ @@ -4919,7 +4916,6 @@ struct __wt_storage_source { int (*terminate)(WT_STORAGE_SOURCE *storage_source, WT_SESSION *session); }; #endif -#endif /* !defined(SWIG) */ /*! * Entry point to an extension, called when the extension is loaded. @@ -5722,448 +5718,430 @@ extern int wiredtiger_extension_terminate(WT_CONNECTION *connection); #define WT_STAT_CONN_CHILD_MODIFY_BLOCKED_PAGE 1310 /*! transaction: Number of prepared updates */ #define WT_STAT_CONN_TXN_PREPARED_UPDATES_COUNT 1311 -/*! transaction: durable timestamp queue entries walked */ -#define WT_STAT_CONN_TXN_DURABLE_QUEUE_WALKED 1312 -/*! transaction: durable timestamp queue insert to empty */ -#define WT_STAT_CONN_TXN_DURABLE_QUEUE_EMPTY 1313 -/*! transaction: durable timestamp queue inserts to head */ -#define WT_STAT_CONN_TXN_DURABLE_QUEUE_HEAD 1314 -/*! transaction: durable timestamp queue inserts total */ -#define WT_STAT_CONN_TXN_DURABLE_QUEUE_INSERTS 1315 -/*! transaction: durable timestamp queue length */ -#define WT_STAT_CONN_TXN_DURABLE_QUEUE_LEN 1316 /*! transaction: prepared transactions */ -#define WT_STAT_CONN_TXN_PREPARE 1317 +#define WT_STAT_CONN_TXN_PREPARE 1312 /*! transaction: prepared transactions committed */ -#define WT_STAT_CONN_TXN_PREPARE_COMMIT 1318 +#define WT_STAT_CONN_TXN_PREPARE_COMMIT 1313 /*! transaction: prepared transactions currently active */ -#define WT_STAT_CONN_TXN_PREPARE_ACTIVE 1319 +#define WT_STAT_CONN_TXN_PREPARE_ACTIVE 1314 /*! transaction: prepared transactions rolled back */ -#define WT_STAT_CONN_TXN_PREPARE_ROLLBACK 1320 +#define WT_STAT_CONN_TXN_PREPARE_ROLLBACK 1315 /*! transaction: query timestamp calls */ -#define WT_STAT_CONN_TXN_QUERY_TS 1321 -/*! transaction: read timestamp queue entries walked */ -#define WT_STAT_CONN_TXN_READ_QUEUE_WALKED 1322 -/*! transaction: read timestamp queue insert to empty */ -#define WT_STAT_CONN_TXN_READ_QUEUE_EMPTY 1323 -/*! transaction: read timestamp queue inserts to head */ -#define WT_STAT_CONN_TXN_READ_QUEUE_HEAD 1324 -/*! transaction: read timestamp queue inserts total */ -#define WT_STAT_CONN_TXN_READ_QUEUE_INSERTS 1325 -/*! transaction: read timestamp queue length */ -#define WT_STAT_CONN_TXN_READ_QUEUE_LEN 1326 +#define WT_STAT_CONN_TXN_QUERY_TS 1316 /*! transaction: rollback to stable calls */ -#define WT_STAT_CONN_TXN_RTS 1327 +#define WT_STAT_CONN_TXN_RTS 1317 /*! transaction: rollback to stable pages visited */ -#define WT_STAT_CONN_TXN_RTS_PAGES_VISITED 1328 +#define WT_STAT_CONN_TXN_RTS_PAGES_VISITED 1318 /*! transaction: rollback to stable tree walk skipping pages */ -#define WT_STAT_CONN_TXN_RTS_TREE_WALK_SKIP_PAGES 1329 +#define WT_STAT_CONN_TXN_RTS_TREE_WALK_SKIP_PAGES 1319 /*! transaction: rollback to stable updates aborted */ -#define WT_STAT_CONN_TXN_RTS_UPD_ABORTED 1330 +#define WT_STAT_CONN_TXN_RTS_UPD_ABORTED 1320 /*! transaction: set timestamp calls */ -#define WT_STAT_CONN_TXN_SET_TS 1331 +#define WT_STAT_CONN_TXN_SET_TS 1321 /*! transaction: set timestamp durable calls */ -#define WT_STAT_CONN_TXN_SET_TS_DURABLE 1332 +#define WT_STAT_CONN_TXN_SET_TS_DURABLE 1322 /*! transaction: set timestamp durable updates */ -#define WT_STAT_CONN_TXN_SET_TS_DURABLE_UPD 1333 +#define WT_STAT_CONN_TXN_SET_TS_DURABLE_UPD 1323 /*! transaction: set timestamp oldest calls */ -#define WT_STAT_CONN_TXN_SET_TS_OLDEST 1334 +#define WT_STAT_CONN_TXN_SET_TS_OLDEST 1324 /*! transaction: set timestamp oldest updates */ -#define WT_STAT_CONN_TXN_SET_TS_OLDEST_UPD 1335 +#define WT_STAT_CONN_TXN_SET_TS_OLDEST_UPD 1325 /*! transaction: set timestamp stable calls */ -#define WT_STAT_CONN_TXN_SET_TS_STABLE 1336 +#define WT_STAT_CONN_TXN_SET_TS_STABLE 1326 /*! transaction: set timestamp stable updates */ -#define WT_STAT_CONN_TXN_SET_TS_STABLE_UPD 1337 +#define WT_STAT_CONN_TXN_SET_TS_STABLE_UPD 1327 /*! transaction: transaction begins */ -#define WT_STAT_CONN_TXN_BEGIN 1338 +#define WT_STAT_CONN_TXN_BEGIN 1328 /*! transaction: transaction checkpoint currently running */ -#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1339 +#define WT_STAT_CONN_TXN_CHECKPOINT_RUNNING 1329 /*! transaction: transaction checkpoint generation */ -#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1340 +#define WT_STAT_CONN_TXN_CHECKPOINT_GENERATION 1330 /*! * transaction: transaction checkpoint history store file duration * (usecs) */ -#define WT_STAT_CONN_TXN_HS_CKPT_DURATION 1341 +#define WT_STAT_CONN_TXN_HS_CKPT_DURATION 1331 /*! transaction: transaction checkpoint max time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1342 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MAX 1332 /*! transaction: transaction checkpoint min time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1343 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_MIN 1333 /*! * transaction: transaction checkpoint most recent duration for gathering * all handles (usecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_DURATION 1344 +#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_DURATION 1334 /*! * transaction: transaction checkpoint most recent duration for gathering * applied handles (usecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_DURATION_APPLY 1345 +#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_DURATION_APPLY 1335 /*! * transaction: transaction checkpoint most recent duration for gathering * skipped handles (usecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_DURATION_SKIP 1346 +#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_DURATION_SKIP 1336 /*! transaction: transaction checkpoint most recent handles applied */ -#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_APPLIED 1347 +#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_APPLIED 1337 /*! transaction: transaction checkpoint most recent handles skipped */ -#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_SKIPPED 1348 +#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_SKIPPED 1338 /*! transaction: transaction checkpoint most recent handles walked */ -#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_WALKED 1349 +#define WT_STAT_CONN_TXN_CHECKPOINT_HANDLE_WALKED 1339 /*! transaction: transaction checkpoint most recent time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1350 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_RECENT 1340 /*! transaction: transaction checkpoint prepare currently running */ -#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_RUNNING 1351 +#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_RUNNING 1341 /*! transaction: transaction checkpoint prepare max time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_MAX 1352 +#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_MAX 1342 /*! transaction: transaction checkpoint prepare min time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_MIN 1353 +#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_MIN 1343 /*! transaction: transaction checkpoint prepare most recent time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_RECENT 1354 +#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_RECENT 1344 /*! transaction: transaction checkpoint prepare total time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_TOTAL 1355 +#define WT_STAT_CONN_TXN_CHECKPOINT_PREP_TOTAL 1345 /*! transaction: transaction checkpoint scrub dirty target */ -#define WT_STAT_CONN_TXN_CHECKPOINT_SCRUB_TARGET 1356 +#define WT_STAT_CONN_TXN_CHECKPOINT_SCRUB_TARGET 1346 /*! transaction: transaction checkpoint scrub time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_SCRUB_TIME 1357 +#define WT_STAT_CONN_TXN_CHECKPOINT_SCRUB_TIME 1347 /*! transaction: transaction checkpoint total time (msecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1358 +#define WT_STAT_CONN_TXN_CHECKPOINT_TIME_TOTAL 1348 /*! transaction: transaction checkpoints */ -#define WT_STAT_CONN_TXN_CHECKPOINT 1359 +#define WT_STAT_CONN_TXN_CHECKPOINT 1349 /*! * transaction: transaction checkpoints skipped because database was * clean */ -#define WT_STAT_CONN_TXN_CHECKPOINT_SKIPPED 1360 +#define WT_STAT_CONN_TXN_CHECKPOINT_SKIPPED 1350 /*! transaction: transaction failures due to history store */ -#define WT_STAT_CONN_TXN_FAIL_CACHE 1361 +#define WT_STAT_CONN_TXN_FAIL_CACHE 1351 /*! * transaction: transaction fsync calls for checkpoint after allocating * the transaction ID */ -#define WT_STAT_CONN_TXN_CHECKPOINT_FSYNC_POST 1362 +#define WT_STAT_CONN_TXN_CHECKPOINT_FSYNC_POST 1352 /*! * transaction: transaction fsync duration for checkpoint after * allocating the transaction ID (usecs) */ -#define WT_STAT_CONN_TXN_CHECKPOINT_FSYNC_POST_DURATION 1363 +#define WT_STAT_CONN_TXN_CHECKPOINT_FSYNC_POST_DURATION 1353 /*! transaction: transaction range of IDs currently pinned */ -#define WT_STAT_CONN_TXN_PINNED_RANGE 1364 +#define WT_STAT_CONN_TXN_PINNED_RANGE 1354 /*! transaction: transaction range of IDs currently pinned by a checkpoint */ -#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1365 +#define WT_STAT_CONN_TXN_PINNED_CHECKPOINT_RANGE 1355 /*! transaction: transaction range of timestamps currently pinned */ -#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP 1366 +#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP 1356 /*! transaction: transaction range of timestamps pinned by a checkpoint */ -#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP_CHECKPOINT 1367 +#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP_CHECKPOINT 1357 /*! * transaction: transaction range of timestamps pinned by the oldest * active read timestamp */ -#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP_READER 1368 +#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP_READER 1358 /*! * transaction: transaction range of timestamps pinned by the oldest * timestamp */ -#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP_OLDEST 1369 +#define WT_STAT_CONN_TXN_PINNED_TIMESTAMP_OLDEST 1359 /*! transaction: transaction read timestamp of the oldest active reader */ -#define WT_STAT_CONN_TXN_TIMESTAMP_OLDEST_ACTIVE_READ 1370 +#define WT_STAT_CONN_TXN_TIMESTAMP_OLDEST_ACTIVE_READ 1360 /*! transaction: transaction sync calls */ -#define WT_STAT_CONN_TXN_SYNC 1371 +#define WT_STAT_CONN_TXN_SYNC 1361 +/*! transaction: transaction walk of concurrent sessions */ +#define WT_STAT_CONN_TXN_WALK_SESSIONS 1362 /*! transaction: transactions committed */ -#define WT_STAT_CONN_TXN_COMMIT 1372 +#define WT_STAT_CONN_TXN_COMMIT 1363 /*! transaction: transactions rolled back */ -#define WT_STAT_CONN_TXN_ROLLBACK 1373 +#define WT_STAT_CONN_TXN_ROLLBACK 1364 /*! LSM: sleep for LSM checkpoint throttle */ -#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1374 +#define WT_STAT_CONN_LSM_CHECKPOINT_THROTTLE 1365 /*! LSM: sleep for LSM merge throttle */ -#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1375 +#define WT_STAT_CONN_LSM_MERGE_THROTTLE 1366 /*! cache: bytes currently in the cache */ -#define WT_STAT_CONN_CACHE_BYTES_INUSE 1376 +#define WT_STAT_CONN_CACHE_BYTES_INUSE 1367 /*! cache: bytes dirty in the cache cumulative */ -#define WT_STAT_CONN_CACHE_BYTES_DIRTY_TOTAL 1377 +#define WT_STAT_CONN_CACHE_BYTES_DIRTY_TOTAL 1368 /*! cache: bytes read into cache */ -#define WT_STAT_CONN_CACHE_BYTES_READ 1378 +#define WT_STAT_CONN_CACHE_BYTES_READ 1369 /*! cache: bytes written from cache */ -#define WT_STAT_CONN_CACHE_BYTES_WRITE 1379 +#define WT_STAT_CONN_CACHE_BYTES_WRITE 1370 /*! cache: checkpoint blocked page eviction */ -#define WT_STAT_CONN_CACHE_EVICTION_CHECKPOINT 1380 +#define WT_STAT_CONN_CACHE_EVICTION_CHECKPOINT 1371 /*! cache: eviction walk target pages histogram - 0-9 */ -#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT10 1381 +#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT10 1372 /*! cache: eviction walk target pages histogram - 10-31 */ -#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT32 1382 +#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT32 1373 /*! cache: eviction walk target pages histogram - 128 and higher */ -#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_GE128 1383 +#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_GE128 1374 /*! cache: eviction walk target pages histogram - 32-63 */ -#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT64 1384 +#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT64 1375 /*! cache: eviction walk target pages histogram - 64-128 */ -#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT128 1385 +#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_LT128 1376 /*! * cache: eviction walk target pages reduced due to history store cache * pressure */ -#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_REDUCED 1386 +#define WT_STAT_CONN_CACHE_EVICTION_TARGET_PAGE_REDUCED 1377 /*! cache: eviction walks abandoned */ -#define WT_STAT_CONN_CACHE_EVICTION_WALKS_ABANDONED 1387 +#define WT_STAT_CONN_CACHE_EVICTION_WALKS_ABANDONED 1378 /*! cache: eviction walks gave up because they restarted their walk twice */ -#define WT_STAT_CONN_CACHE_EVICTION_WALKS_STOPPED 1388 +#define WT_STAT_CONN_CACHE_EVICTION_WALKS_STOPPED 1379 /*! * cache: eviction walks gave up because they saw too many pages and * found no candidates */ -#define WT_STAT_CONN_CACHE_EVICTION_WALKS_GAVE_UP_NO_TARGETS 1389 +#define WT_STAT_CONN_CACHE_EVICTION_WALKS_GAVE_UP_NO_TARGETS 1380 /*! * cache: eviction walks gave up because they saw too many pages and * found too few candidates */ -#define WT_STAT_CONN_CACHE_EVICTION_WALKS_GAVE_UP_RATIO 1390 +#define WT_STAT_CONN_CACHE_EVICTION_WALKS_GAVE_UP_RATIO 1381 /*! cache: eviction walks reached end of tree */ -#define WT_STAT_CONN_CACHE_EVICTION_WALKS_ENDED 1391 +#define WT_STAT_CONN_CACHE_EVICTION_WALKS_ENDED 1382 /*! cache: eviction walks restarted */ -#define WT_STAT_CONN_CACHE_EVICTION_WALK_RESTART 1392 +#define WT_STAT_CONN_CACHE_EVICTION_WALK_RESTART 1383 /*! cache: eviction walks started from root of tree */ -#define WT_STAT_CONN_CACHE_EVICTION_WALK_FROM_ROOT 1393 +#define WT_STAT_CONN_CACHE_EVICTION_WALK_FROM_ROOT 1384 /*! cache: eviction walks started from saved location in tree */ -#define WT_STAT_CONN_CACHE_EVICTION_WALK_SAVED_POS 1394 +#define WT_STAT_CONN_CACHE_EVICTION_WALK_SAVED_POS 1385 /*! cache: hazard pointer blocked page eviction */ -#define WT_STAT_CONN_CACHE_EVICTION_HAZARD 1395 +#define WT_STAT_CONN_CACHE_EVICTION_HAZARD 1386 /*! cache: history store table insert calls */ -#define WT_STAT_CONN_CACHE_HS_INSERT 1396 +#define WT_STAT_CONN_CACHE_HS_INSERT 1387 /*! cache: history store table insert calls that returned restart */ -#define WT_STAT_CONN_CACHE_HS_INSERT_RESTART 1397 +#define WT_STAT_CONN_CACHE_HS_INSERT_RESTART 1388 /*! * cache: history store table out-of-order resolved updates that lose * their durable timestamp */ -#define WT_STAT_CONN_CACHE_HS_ORDER_LOSE_DURABLE_TIMESTAMP 1398 +#define WT_STAT_CONN_CACHE_HS_ORDER_LOSE_DURABLE_TIMESTAMP 1389 /*! * cache: history store table out-of-order updates that were fixed up by * moving existing records */ -#define WT_STAT_CONN_CACHE_HS_ORDER_FIXUP_MOVE 1399 +#define WT_STAT_CONN_CACHE_HS_ORDER_FIXUP_MOVE 1390 /*! * cache: history store table out-of-order updates that were fixed up * during insertion */ -#define WT_STAT_CONN_CACHE_HS_ORDER_FIXUP_INSERT 1400 +#define WT_STAT_CONN_CACHE_HS_ORDER_FIXUP_INSERT 1391 /*! cache: history store table reads */ -#define WT_STAT_CONN_CACHE_HS_READ 1401 +#define WT_STAT_CONN_CACHE_HS_READ 1392 /*! cache: history store table reads missed */ -#define WT_STAT_CONN_CACHE_HS_READ_MISS 1402 +#define WT_STAT_CONN_CACHE_HS_READ_MISS 1393 /*! cache: history store table reads requiring squashed modifies */ -#define WT_STAT_CONN_CACHE_HS_READ_SQUASH 1403 +#define WT_STAT_CONN_CACHE_HS_READ_SQUASH 1394 /*! * cache: history store table truncation by rollback to stable to remove * an unstable update */ -#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_RTS_UNSTABLE 1404 +#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_RTS_UNSTABLE 1395 /*! * cache: history store table truncation by rollback to stable to remove * an update */ -#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_RTS 1405 +#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_RTS 1396 /*! cache: history store table truncation to remove an update */ -#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE 1406 +#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE 1397 /*! * cache: history store table truncation to remove range of updates due * to key being removed from the data page during reconciliation */ -#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_ONPAGE_REMOVAL 1407 +#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_ONPAGE_REMOVAL 1398 /*! * cache: history store table truncation to remove range of updates due * to non timestamped update on data page */ -#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_NON_TS 1408 +#define WT_STAT_CONN_CACHE_HS_KEY_TRUNCATE_NON_TS 1399 /*! cache: history store table writes requiring squashed modifies */ -#define WT_STAT_CONN_CACHE_HS_WRITE_SQUASH 1409 +#define WT_STAT_CONN_CACHE_HS_WRITE_SQUASH 1400 /*! cache: in-memory page passed criteria to be split */ -#define WT_STAT_CONN_CACHE_INMEM_SPLITTABLE 1410 +#define WT_STAT_CONN_CACHE_INMEM_SPLITTABLE 1401 /*! cache: in-memory page splits */ -#define WT_STAT_CONN_CACHE_INMEM_SPLIT 1411 +#define WT_STAT_CONN_CACHE_INMEM_SPLIT 1402 /*! cache: internal pages evicted */ -#define WT_STAT_CONN_CACHE_EVICTION_INTERNAL 1412 +#define WT_STAT_CONN_CACHE_EVICTION_INTERNAL 1403 /*! cache: internal pages split during eviction */ -#define WT_STAT_CONN_CACHE_EVICTION_SPLIT_INTERNAL 1413 +#define WT_STAT_CONN_CACHE_EVICTION_SPLIT_INTERNAL 1404 /*! cache: leaf pages split during eviction */ -#define WT_STAT_CONN_CACHE_EVICTION_SPLIT_LEAF 1414 +#define WT_STAT_CONN_CACHE_EVICTION_SPLIT_LEAF 1405 /*! cache: modified pages evicted */ -#define WT_STAT_CONN_CACHE_EVICTION_DIRTY 1415 +#define WT_STAT_CONN_CACHE_EVICTION_DIRTY 1406 /*! cache: overflow pages read into cache */ -#define WT_STAT_CONN_CACHE_READ_OVERFLOW 1416 +#define WT_STAT_CONN_CACHE_READ_OVERFLOW 1407 /*! cache: page split during eviction deepened the tree */ -#define WT_STAT_CONN_CACHE_EVICTION_DEEPEN 1417 +#define WT_STAT_CONN_CACHE_EVICTION_DEEPEN 1408 /*! cache: page written requiring history store records */ -#define WT_STAT_CONN_CACHE_WRITE_HS 1418 +#define WT_STAT_CONN_CACHE_WRITE_HS 1409 /*! cache: pages read into cache */ -#define WT_STAT_CONN_CACHE_READ 1419 +#define WT_STAT_CONN_CACHE_READ 1410 /*! cache: pages read into cache after truncate */ -#define WT_STAT_CONN_CACHE_READ_DELETED 1420 +#define WT_STAT_CONN_CACHE_READ_DELETED 1411 /*! cache: pages read into cache after truncate in prepare state */ -#define WT_STAT_CONN_CACHE_READ_DELETED_PREPARED 1421 +#define WT_STAT_CONN_CACHE_READ_DELETED_PREPARED 1412 /*! cache: pages requested from the cache */ -#define WT_STAT_CONN_CACHE_PAGES_REQUESTED 1422 +#define WT_STAT_CONN_CACHE_PAGES_REQUESTED 1413 /*! cache: pages seen by eviction walk */ -#define WT_STAT_CONN_CACHE_EVICTION_PAGES_SEEN 1423 +#define WT_STAT_CONN_CACHE_EVICTION_PAGES_SEEN 1414 /*! cache: pages written from cache */ -#define WT_STAT_CONN_CACHE_WRITE 1424 +#define WT_STAT_CONN_CACHE_WRITE 1415 /*! cache: pages written requiring in-memory restoration */ -#define WT_STAT_CONN_CACHE_WRITE_RESTORE 1425 +#define WT_STAT_CONN_CACHE_WRITE_RESTORE 1416 /*! cache: tracked dirty bytes in the cache */ -#define WT_STAT_CONN_CACHE_BYTES_DIRTY 1426 +#define WT_STAT_CONN_CACHE_BYTES_DIRTY 1417 /*! cache: unmodified pages evicted */ -#define WT_STAT_CONN_CACHE_EVICTION_CLEAN 1427 +#define WT_STAT_CONN_CACHE_EVICTION_CLEAN 1418 /*! checkpoint-cleanup: pages added for eviction */ -#define WT_STAT_CONN_CC_PAGES_EVICT 1428 +#define WT_STAT_CONN_CC_PAGES_EVICT 1419 /*! checkpoint-cleanup: pages removed */ -#define WT_STAT_CONN_CC_PAGES_REMOVED 1429 +#define WT_STAT_CONN_CC_PAGES_REMOVED 1420 /*! checkpoint-cleanup: pages skipped during tree walk */ -#define WT_STAT_CONN_CC_PAGES_WALK_SKIPPED 1430 +#define WT_STAT_CONN_CC_PAGES_WALK_SKIPPED 1421 /*! checkpoint-cleanup: pages visited */ -#define WT_STAT_CONN_CC_PAGES_VISITED 1431 +#define WT_STAT_CONN_CC_PAGES_VISITED 1422 /*! cursor: Total number of entries skipped by cursor next calls */ -#define WT_STAT_CONN_CURSOR_NEXT_SKIP_TOTAL 1432 +#define WT_STAT_CONN_CURSOR_NEXT_SKIP_TOTAL 1423 /*! cursor: Total number of entries skipped by cursor prev calls */ -#define WT_STAT_CONN_CURSOR_PREV_SKIP_TOTAL 1433 +#define WT_STAT_CONN_CURSOR_PREV_SKIP_TOTAL 1424 /*! * cursor: Total number of entries skipped to position the history store * cursor */ -#define WT_STAT_CONN_CURSOR_SKIP_HS_CUR_POSITION 1434 +#define WT_STAT_CONN_CURSOR_SKIP_HS_CUR_POSITION 1425 /*! * cursor: cursor next calls that skip due to a globally visible history * store tombstone */ -#define WT_STAT_CONN_CURSOR_NEXT_HS_TOMBSTONE 1435 +#define WT_STAT_CONN_CURSOR_NEXT_HS_TOMBSTONE 1426 /*! * cursor: cursor next calls that skip greater than or equal to 100 * entries */ -#define WT_STAT_CONN_CURSOR_NEXT_SKIP_GE_100 1436 +#define WT_STAT_CONN_CURSOR_NEXT_SKIP_GE_100 1427 /*! cursor: cursor next calls that skip less than 100 entries */ -#define WT_STAT_CONN_CURSOR_NEXT_SKIP_LT_100 1437 +#define WT_STAT_CONN_CURSOR_NEXT_SKIP_LT_100 1428 /*! * cursor: cursor prev calls that skip due to a globally visible history * store tombstone */ -#define WT_STAT_CONN_CURSOR_PREV_HS_TOMBSTONE 1438 +#define WT_STAT_CONN_CURSOR_PREV_HS_TOMBSTONE 1429 /*! * cursor: cursor prev calls that skip greater than or equal to 100 * entries */ -#define WT_STAT_CONN_CURSOR_PREV_SKIP_GE_100 1439 +#define WT_STAT_CONN_CURSOR_PREV_SKIP_GE_100 1430 /*! cursor: cursor prev calls that skip less than 100 entries */ -#define WT_STAT_CONN_CURSOR_PREV_SKIP_LT_100 1440 +#define WT_STAT_CONN_CURSOR_PREV_SKIP_LT_100 1431 /*! cursor: open cursor count */ -#define WT_STAT_CONN_CURSOR_OPEN_COUNT 1441 +#define WT_STAT_CONN_CURSOR_OPEN_COUNT 1432 /*! reconciliation: approximate byte size of timestamps in pages written */ -#define WT_STAT_CONN_REC_TIME_WINDOW_BYTES_TS 1442 +#define WT_STAT_CONN_REC_TIME_WINDOW_BYTES_TS 1433 /*! * reconciliation: approximate byte size of transaction IDs in pages * written */ -#define WT_STAT_CONN_REC_TIME_WINDOW_BYTES_TXN 1443 +#define WT_STAT_CONN_REC_TIME_WINDOW_BYTES_TXN 1434 /*! reconciliation: fast-path pages deleted */ -#define WT_STAT_CONN_REC_PAGE_DELETE_FAST 1444 +#define WT_STAT_CONN_REC_PAGE_DELETE_FAST 1435 /*! reconciliation: page reconciliation calls */ -#define WT_STAT_CONN_REC_PAGES 1445 +#define WT_STAT_CONN_REC_PAGES 1436 /*! reconciliation: page reconciliation calls for eviction */ -#define WT_STAT_CONN_REC_PAGES_EVICTION 1446 +#define WT_STAT_CONN_REC_PAGES_EVICTION 1437 /*! reconciliation: pages deleted */ -#define WT_STAT_CONN_REC_PAGE_DELETE 1447 +#define WT_STAT_CONN_REC_PAGE_DELETE 1438 /*! * reconciliation: pages written including an aggregated newest start * durable timestamp */ -#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_START_DURABLE_TS 1448 +#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_START_DURABLE_TS 1439 /*! * reconciliation: pages written including an aggregated newest stop * durable timestamp */ -#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_STOP_DURABLE_TS 1449 +#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_STOP_DURABLE_TS 1440 /*! * reconciliation: pages written including an aggregated newest stop * timestamp */ -#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_STOP_TS 1450 +#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_STOP_TS 1441 /*! * reconciliation: pages written including an aggregated newest stop * transaction ID */ -#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_STOP_TXN 1451 +#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_STOP_TXN 1442 /*! * reconciliation: pages written including an aggregated newest * transaction ID */ -#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_TXN 1452 +#define WT_STAT_CONN_REC_TIME_AGGR_NEWEST_TXN 1443 /*! * reconciliation: pages written including an aggregated oldest start * timestamp */ -#define WT_STAT_CONN_REC_TIME_AGGR_OLDEST_START_TS 1453 +#define WT_STAT_CONN_REC_TIME_AGGR_OLDEST_START_TS 1444 /*! reconciliation: pages written including an aggregated prepare */ -#define WT_STAT_CONN_REC_TIME_AGGR_PREPARED 1454 +#define WT_STAT_CONN_REC_TIME_AGGR_PREPARED 1445 /*! * reconciliation: pages written including at least one start durable * timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_DURABLE_START_TS 1455 +#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_DURABLE_START_TS 1446 /*! * reconciliation: pages written including at least one start transaction * ID */ -#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_START_TXN 1456 +#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_START_TXN 1447 /*! * reconciliation: pages written including at least one stop durable * timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_DURABLE_STOP_TS 1457 +#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_DURABLE_STOP_TS 1448 /*! reconciliation: pages written including at least one stop timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_STOP_TS 1458 +#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_STOP_TS 1449 /*! * reconciliation: pages written including at least one stop transaction * ID */ -#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_STOP_TXN 1459 +#define WT_STAT_CONN_REC_TIME_WINDOW_PAGES_STOP_TXN 1450 /*! reconciliation: records written including a start durable timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_DURABLE_START_TS 1460 +#define WT_STAT_CONN_REC_TIME_WINDOW_DURABLE_START_TS 1451 /*! reconciliation: records written including a start timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_START_TS 1461 +#define WT_STAT_CONN_REC_TIME_WINDOW_START_TS 1452 /*! reconciliation: records written including a start transaction ID */ -#define WT_STAT_CONN_REC_TIME_WINDOW_START_TXN 1462 +#define WT_STAT_CONN_REC_TIME_WINDOW_START_TXN 1453 /*! reconciliation: records written including a stop durable timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_DURABLE_STOP_TS 1463 +#define WT_STAT_CONN_REC_TIME_WINDOW_DURABLE_STOP_TS 1454 /*! reconciliation: records written including a stop timestamp */ -#define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TS 1464 +#define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TS 1455 /*! reconciliation: records written including a stop transaction ID */ -#define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TXN 1465 +#define WT_STAT_CONN_REC_TIME_WINDOW_STOP_TXN 1456 /*! session: flush_tier operation calls */ -#define WT_STAT_CONN_FLUSH_TIER 1466 +#define WT_STAT_CONN_FLUSH_TIER 1457 /*! session: tiered storage local retention time (secs) */ -#define WT_STAT_CONN_TIERED_RETENTION 1467 +#define WT_STAT_CONN_TIERED_RETENTION 1458 /*! transaction: race to read prepared update retry */ -#define WT_STAT_CONN_TXN_READ_RACE_PREPARE_UPDATE 1468 +#define WT_STAT_CONN_TXN_READ_RACE_PREPARE_UPDATE 1459 /*! * transaction: rollback to stable history store records with stop * timestamps older than newer records */ -#define WT_STAT_CONN_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 1469 +#define WT_STAT_CONN_TXN_RTS_HS_STOP_OLDER_THAN_NEWER_START 1460 /*! transaction: rollback to stable inconsistent checkpoint */ -#define WT_STAT_CONN_TXN_RTS_INCONSISTENT_CKPT 1470 +#define WT_STAT_CONN_TXN_RTS_INCONSISTENT_CKPT 1461 /*! transaction: rollback to stable keys removed */ -#define WT_STAT_CONN_TXN_RTS_KEYS_REMOVED 1471 +#define WT_STAT_CONN_TXN_RTS_KEYS_REMOVED 1462 /*! transaction: rollback to stable keys restored */ -#define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1472 +#define WT_STAT_CONN_TXN_RTS_KEYS_RESTORED 1463 /*! transaction: rollback to stable restored tombstones from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1473 +#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_TOMBSTONES 1464 /*! transaction: rollback to stable restored updates from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1474 +#define WT_STAT_CONN_TXN_RTS_HS_RESTORE_UPDATES 1465 /*! transaction: rollback to stable sweeping history store keys */ -#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1475 +#define WT_STAT_CONN_TXN_RTS_SWEEP_HS_KEYS 1466 /*! transaction: rollback to stable updates removed from history store */ -#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1476 +#define WT_STAT_CONN_TXN_RTS_HS_REMOVED 1467 /*! transaction: transaction checkpoints due to obsolete pages */ -#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1477 +#define WT_STAT_CONN_TXN_CHECKPOINT_OBSOLETE_APPLIED 1468 /*! transaction: update conflicts */ -#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1478 +#define WT_STAT_CONN_TXN_UPDATE_CONFLICT 1469 /*! * @} diff --git a/src/third_party/wiredtiger/src/meta/meta_ckpt.c b/src/third_party/wiredtiger/src/meta/meta_ckpt.c index bbbcb64f230..6b6a8091695 100644 --- a/src/third_party/wiredtiger/src/meta/meta_ckpt.c +++ b/src/third_party/wiredtiger/src/meta/meta_ckpt.c @@ -715,7 +715,7 @@ format: /* * __wt_metadata_update_base_write_gen -- - * Update the connection's base write generation. + * Update the connection's base write generation from the config string. */ int __wt_metadata_update_base_write_gen(WT_SESSION_IMPL *session, const char *config) diff --git a/src/third_party/wiredtiger/src/os_posix/os_fallocate.c b/src/third_party/wiredtiger/src/os_posix/os_fallocate.c index 198ee02103f..ce08eaad231 100644 --- a/src/third_party/wiredtiger/src/os_posix/os_fallocate.c +++ b/src/third_party/wiredtiger/src/os_posix/os_fallocate.c @@ -20,30 +20,28 @@ */ #if defined(HAVE_FALLOCATE) || (defined(__linux__) && defined(SYS_fallocate)) || \ defined(HAVE_POSIX_FALLOCATE) -#define WT_CALL_FUNCTION(op) \ - do { \ - WT_DECL_RET; \ - WT_FILE_HANDLE_POSIX *pfh; \ - WT_SESSION_IMPL *session; \ - bool remap; \ - \ - session = (WT_SESSION_IMPL *)wt_session; \ - pfh = (WT_FILE_HANDLE_POSIX *)file_handle; \ - \ - remap = (offset != pfh->mmap_size); \ - if (remap) \ - __wt_prepare_remap_resize_file(file_handle, wt_session); \ - \ - WT_SYSCALL_RETRY(op, ret); \ - if (remap) { \ - if (ret == 0) \ - __wt_remap_resize_file(file_handle, wt_session); \ - else { \ - __wt_release_without_remap(file_handle); \ - WT_RET_MSG(session, ret, "%s: fallocate:", file_handle->name); \ - } \ - } \ - return (0); \ +#define WT_CALL_FUNCTION(op) \ + do { \ + WT_DECL_RET; \ + WT_FILE_HANDLE_POSIX *pfh; \ + bool remap; \ + \ + pfh = (WT_FILE_HANDLE_POSIX *)file_handle; \ + \ + remap = (offset != pfh->mmap_size); \ + if (remap) \ + __wt_prepare_remap_resize_file(file_handle, wt_session); \ + \ + WT_SYSCALL_RETRY(op, ret); \ + if (remap) { \ + if (ret == 0) \ + __wt_remap_resize_file(file_handle, wt_session); \ + else { \ + __wt_release_without_remap(file_handle); \ + WT_RET(ret); \ + } \ + } \ + return (0); \ } while (0) #endif diff --git a/src/third_party/wiredtiger/src/reconcile/rec_write.c b/src/third_party/wiredtiger/src/reconcile/rec_write.c index d0511459385..3e21b7f3ca0 100644 --- a/src/third_party/wiredtiger/src/reconcile/rec_write.c +++ b/src/third_party/wiredtiger/src/reconcile/rec_write.c @@ -2291,8 +2291,7 @@ __rec_hs_wrapup(WT_SESSION_IMPL *session, WT_RECONCILE *r) for (multi = r->multi, i = 0; i < r->multi_next; ++multi, ++i) if (multi->supd != NULL) { - WT_ERR(__wt_hs_insert_updates(session, r->page, multi)); - r->cache_write_hs = true; + WT_ERR(__wt_hs_insert_updates(session, r->page, multi, &r->cache_write_hs)); if (!multi->supd_restore) { __wt_free(session, multi->supd); multi->supd_entries = 0; diff --git a/src/third_party/wiredtiger/src/schema/schema_project.c b/src/third_party/wiredtiger/src/schema/schema_project.c index d3db2c8e5e9..6a768264d5b 100644 --- a/src/third_party/wiredtiger/src/schema/schema_project.c +++ b/src/third_party/wiredtiger/src/schema/schema_project.c @@ -113,14 +113,14 @@ __wt_schema_project_in(WT_SESSION_IMPL *session, WT_CURSOR **cp, const char *pro WT_RET(__pack_size(session, &pv, &len)); offset = WT_PTRDIFF(p, buf->mem); - WT_RET(__wt_buf_grow(session, buf, buf->size + len)); + WT_RET(__wt_buf_grow(session, buf, (buf->size + len) - old_len)); p = (uint8_t *)buf->mem + offset; - end = (uint8_t *)buf->mem + buf->size + len; + end = (uint8_t *)buf->mem + (buf->size + len) - old_len; /* Make room if we're inserting out-of-order. */ if (offset + old_len < buf->size) memmove(p + len, p + old_len, buf->size - (offset + old_len)); WT_RET(__pack_write(session, &pv, &p, len)); - buf->size += len; + buf->size += len - old_len; break; default: diff --git a/src/third_party/wiredtiger/src/session/session_api.c b/src/third_party/wiredtiger/src/session/session_api.c index aec5e3c0f75..cb4e12df887 100644 --- a/src/third_party/wiredtiger/src/session/session_api.c +++ b/src/third_party/wiredtiger/src/session/session_api.c @@ -201,7 +201,6 @@ __session_clear(WT_SESSION_IMPL *session) * * For these reasons, be careful when clearing the session structure. */ - __wt_txn_clear_timestamp_queues(session); memset(session, 0, WT_SESSION_CLEAR_SIZE); WT_INIT_LSN(&session->bg_sync_lsn); diff --git a/src/third_party/wiredtiger/src/support/stat.c b/src/third_party/wiredtiger/src/support/stat.c index 6723f064f17..5ae4a78605b 100644 --- a/src/third_party/wiredtiger/src/support/stat.c +++ b/src/third_party/wiredtiger/src/support/stat.c @@ -1274,21 +1274,11 @@ static const char *const __stats_connection_desc[] = { "thread-yield: page delete rollback time sleeping for state change (usecs)", "thread-yield: page reconciliation yielded due to child modification", "transaction: Number of prepared updates", - "transaction: durable timestamp queue entries walked", - "transaction: durable timestamp queue insert to empty", - "transaction: durable timestamp queue inserts to head", - "transaction: durable timestamp queue inserts total", - "transaction: durable timestamp queue length", "transaction: prepared transactions", "transaction: prepared transactions committed", "transaction: prepared transactions currently active", "transaction: prepared transactions rolled back", "transaction: query timestamp calls", - "transaction: read timestamp queue entries walked", - "transaction: read timestamp queue insert to empty", - "transaction: read timestamp queue inserts to head", - "transaction: read timestamp queue inserts total", - "transaction: read timestamp queue length", "transaction: rollback to stable calls", "transaction: rollback to stable pages visited", "transaction: rollback to stable tree walk skipping pages", @@ -1335,6 +1325,7 @@ static const char *const __stats_connection_desc[] = { "transaction: transaction range of timestamps pinned by the oldest timestamp", "transaction: transaction read timestamp of the oldest active reader", "transaction: transaction sync calls", + "transaction: transaction walk of concurrent sessions", "transaction: transactions committed", "transaction: transactions rolled back", "LSM: sleep for LSM checkpoint throttle", @@ -1797,21 +1788,11 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) stats->page_del_rollback_blocked = 0; stats->child_modify_blocked_page = 0; stats->txn_prepared_updates_count = 0; - stats->txn_durable_queue_walked = 0; - stats->txn_durable_queue_empty = 0; - stats->txn_durable_queue_head = 0; - stats->txn_durable_queue_inserts = 0; - stats->txn_durable_queue_len = 0; stats->txn_prepare = 0; stats->txn_prepare_commit = 0; stats->txn_prepare_active = 0; stats->txn_prepare_rollback = 0; stats->txn_query_ts = 0; - stats->txn_read_queue_walked = 0; - stats->txn_read_queue_empty = 0; - stats->txn_read_queue_head = 0; - stats->txn_read_queue_inserts = 0; - stats->txn_read_queue_len = 0; stats->txn_rts = 0; stats->txn_rts_pages_visited = 0; stats->txn_rts_tree_walk_skip_pages = 0; @@ -1857,6 +1838,7 @@ __wt_stat_connection_clear_single(WT_CONNECTION_STATS *stats) /* not clearing txn_pinned_timestamp_oldest */ /* not clearing txn_timestamp_oldest_active_read */ stats->txn_sync = 0; + stats->txn_walk_sessions = 0; stats->txn_commit = 0; stats->txn_rollback = 0; stats->lsm_checkpoint_throttle = 0; @@ -2313,21 +2295,11 @@ __wt_stat_connection_aggregate(WT_CONNECTION_STATS **from, WT_CONNECTION_STATS * to->page_del_rollback_blocked += WT_STAT_READ(from, page_del_rollback_blocked); to->child_modify_blocked_page += WT_STAT_READ(from, child_modify_blocked_page); to->txn_prepared_updates_count += WT_STAT_READ(from, txn_prepared_updates_count); - to->txn_durable_queue_walked += WT_STAT_READ(from, txn_durable_queue_walked); - to->txn_durable_queue_empty += WT_STAT_READ(from, txn_durable_queue_empty); - to->txn_durable_queue_head += WT_STAT_READ(from, txn_durable_queue_head); - to->txn_durable_queue_inserts += WT_STAT_READ(from, txn_durable_queue_inserts); - to->txn_durable_queue_len += WT_STAT_READ(from, txn_durable_queue_len); to->txn_prepare += WT_STAT_READ(from, txn_prepare); to->txn_prepare_commit += WT_STAT_READ(from, txn_prepare_commit); to->txn_prepare_active += WT_STAT_READ(from, txn_prepare_active); to->txn_prepare_rollback += WT_STAT_READ(from, txn_prepare_rollback); to->txn_query_ts += WT_STAT_READ(from, txn_query_ts); - to->txn_read_queue_walked += WT_STAT_READ(from, txn_read_queue_walked); - to->txn_read_queue_empty += WT_STAT_READ(from, txn_read_queue_empty); - to->txn_read_queue_head += WT_STAT_READ(from, txn_read_queue_head); - to->txn_read_queue_inserts += WT_STAT_READ(from, txn_read_queue_inserts); - to->txn_read_queue_len += WT_STAT_READ(from, txn_read_queue_len); to->txn_rts += WT_STAT_READ(from, txn_rts); to->txn_rts_pages_visited += WT_STAT_READ(from, txn_rts_pages_visited); to->txn_rts_tree_walk_skip_pages += WT_STAT_READ(from, txn_rts_tree_walk_skip_pages); @@ -2376,6 +2348,7 @@ __wt_stat_connection_aggregate(WT_CONNECTION_STATS **from, WT_CONNECTION_STATS * to->txn_pinned_timestamp_oldest += WT_STAT_READ(from, txn_pinned_timestamp_oldest); to->txn_timestamp_oldest_active_read += WT_STAT_READ(from, txn_timestamp_oldest_active_read); to->txn_sync += WT_STAT_READ(from, txn_sync); + to->txn_walk_sessions += WT_STAT_READ(from, txn_walk_sessions); to->txn_commit += WT_STAT_READ(from, txn_commit); to->txn_rollback += WT_STAT_READ(from, txn_rollback); to->lsm_checkpoint_throttle += WT_STAT_READ(from, lsm_checkpoint_throttle); diff --git a/src/third_party/wiredtiger/src/txn/txn.c b/src/third_party/wiredtiger/src/txn/txn.c index bf85cf61443..10887763194 100644 --- a/src/third_party/wiredtiger/src/txn/txn.c +++ b/src/third_party/wiredtiger/src/txn/txn.c @@ -169,6 +169,7 @@ __wt_txn_active(WT_SESSION_IMPL *session, uint64_t txnid) /* Walk the array of concurrent transactions. */ WT_ORDERED_READ(session_cnt, conn->session_cnt); + WT_STAT_CONN_INCR(session, txn_walk_sessions); for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { /* If the transaction is in the list, it is uncommitted. */ if (s->id == txnid) @@ -240,6 +241,7 @@ __txn_get_snapshot_int(WT_SESSION_IMPL *session, bool publish) /* Walk the array of concurrent transactions. */ WT_ORDERED_READ(session_cnt, conn->session_cnt); + WT_STAT_CONN_INCR(session, txn_walk_sessions); for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { /* * Build our snapshot of any concurrent transaction IDs. @@ -340,6 +342,7 @@ __txn_oldest_scan(WT_SESSION_IMPL *session, uint64_t *oldest_idp, uint64_t *last /* Walk the array of concurrent transactions. */ WT_ORDERED_READ(session_cnt, conn->session_cnt); + WT_STAT_CONN_INCR(session, txn_walk_sessions); for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { /* Update the last running transaction ID. */ while ((id = s->id) != WT_TXN_NONE && WT_TXNID_LE(prev_oldest_id, id) && @@ -1972,8 +1975,6 @@ __wt_txn_stats_update(WT_SESSION_IMPL *session) WT_STAT_SET(session, stats, txn_checkpoint_time_min, conn->ckpt_time_min); WT_STAT_SET(session, stats, txn_checkpoint_time_recent, conn->ckpt_time_recent); WT_STAT_SET(session, stats, txn_checkpoint_time_total, conn->ckpt_time_total); - WT_STAT_SET(session, stats, txn_durable_queue_len, txn_global->durable_timestampq_len); - WT_STAT_SET(session, stats, txn_read_queue_len, txn_global->read_timestampq_len); } /* @@ -2028,12 +2029,6 @@ __wt_txn_global_init(WT_SESSION_IMPL *session, const char *cfg[]) WT_RWLOCK_INIT_TRACKED(session, &txn_global->rwlock, txn_global); WT_RET(__wt_rwlock_init(session, &txn_global->visibility_rwlock)); - WT_RWLOCK_INIT_TRACKED(session, &txn_global->durable_timestamp_rwlock, durable_timestamp); - TAILQ_INIT(&txn_global->durable_timestamph); - - WT_RWLOCK_INIT_TRACKED(session, &txn_global->read_timestamp_rwlock, read_timestamp); - TAILQ_INIT(&txn_global->read_timestamph); - WT_RET(__wt_calloc_def(session, conn->session_size, &txn_global->txn_shared_list)); for (i = 0, s = txn_global->txn_shared_list; i < conn->session_size; i++, s++) @@ -2060,8 +2055,6 @@ __wt_txn_global_destroy(WT_SESSION_IMPL *session) __wt_spin_destroy(session, &txn_global->id_lock); __wt_rwlock_destroy(session, &txn_global->rwlock); - __wt_rwlock_destroy(session, &txn_global->durable_timestamp_rwlock); - __wt_rwlock_destroy(session, &txn_global->read_timestamp_rwlock); __wt_rwlock_destroy(session, &txn_global->visibility_rwlock); __wt_free(session, txn_global->txn_shared_list); } @@ -2342,6 +2335,7 @@ __wt_verbose_dump_txn(WT_SESSION_IMPL *session) * handles is not thread safe, so some information may change while traversing if other threads * are active at the same time, which is OK since this is diagnostic code. */ + WT_STAT_CONN_INCR(session, txn_walk_sessions); for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { /* Skip sessions with no active transaction */ if ((id = s->id) == WT_TXN_NONE && s->pinned_id == WT_TXN_NONE) diff --git a/src/third_party/wiredtiger/src/txn/txn_recover.c b/src/third_party/wiredtiger/src/txn/txn_recover.c index 4045d4f4de9..c932cf087d5 100644 --- a/src/third_party/wiredtiger/src/txn/txn_recover.c +++ b/src/third_party/wiredtiger/src/txn/txn_recover.c @@ -548,6 +548,36 @@ err: } /* + * __recovery_correct_write_gen -- + * Update the connection's base write generation from all files in metadata. + */ +static int +__recovery_correct_write_gen(WT_SESSION_IMPL *session) +{ + WT_CURSOR *cursor; + WT_DECL_RET; + char *config, *uri; + + WT_RET(__wt_metadata_cursor(session, &cursor)); + while ((ret = cursor->next(cursor)) == 0) { + WT_ERR(cursor->get_key(cursor, &uri)); + + if (!WT_PREFIX_MATCH(uri, "file:")) + continue; + + WT_ERR(cursor->get_value(cursor, &config)); + + /* Update base write gen to the write gen. */ + WT_ERR(__wt_metadata_update_base_write_gen(session, config)); + } + WT_ERR_NOTFOUND_OK(ret, false); + +err: + WT_TRET(__wt_metadata_cursor_release(session, &cursor)); + return (ret); +} + +/* * __recovery_setup_file -- * Set up the recovery slot for a file, track the largest file ID, and update the base write gen * based on the file's configuration. @@ -974,8 +1004,12 @@ done: */ WT_ERR(session->iface.checkpoint(&session->iface, "force=1")); - /* Initialize the connection's base write generation after rollback to stable. */ - WT_ERR(__wt_metadata_init_base_write_gen(session)); + /* + * Rollback to stable may have left out clearing stale transaction ids. Update the connection + * base write generation based on the latest checkpoint write generations to reset them. + */ + if (rts_executed) + WT_ERR(__recovery_correct_write_gen(session)); /* * Update the open dhandles write generations and base write generation with the connection's diff --git a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c index 2ece30a9f0d..b95ca634d1f 100644 --- a/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c +++ b/src/third_party/wiredtiger/src/txn/txn_rollback_to_stable.c @@ -133,11 +133,11 @@ err: } /* - * __rollback_row_add_update -- + * __rollback_row_modify -- * Add the provided update to the head of the update list. */ static inline int -__rollback_row_add_update(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW *rip, WT_UPDATE *upd) +__rollback_row_modify(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW *rip, WT_UPDATE *upd) { WT_DECL_RET; WT_PAGE_MODIFY *mod; @@ -193,32 +193,6 @@ err: } /* - * __rollback_col_ondisk_fixup_key -- - * Allocate tombstone and calls function add update to head of insert list. - */ -static int -__rollback_col_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_REF *ref, - wt_timestamp_t rollback_timestamp, bool replace, uint64_t recno) -{ - WT_DECL_RET; - WT_UPDATE *upd; - - WT_UNUSED(rollback_timestamp); - WT_UNUSED(replace); - - /* Allocate tombstone to update to remove the unstable value. */ - WT_RET(__wt_upd_alloc_tombstone(session, &upd, NULL)); - WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_removed); - WT_ERR(__rollback_col_modify(session, ref, upd, recno)); - return (ret); - -err: - __wt_free(session, upd); - - return (ret); -} - -/* * __rollback_check_if_txnid_non_committed -- * Check if the transaction id is non committed. */ @@ -266,14 +240,15 @@ __rollback_check_if_txnid_non_committed(WT_SESSION_IMPL *session, uint64_t txnid } /* - * __rollback_row_ondisk_fixup_key -- + * __rollback_ondisk_fixup_key -- * Abort updates in the history store and replace the on-disk value with an update that * satisfies the given timestamp. */ static int -__rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW *rip, - wt_timestamp_t rollback_timestamp, bool replace) +__rollback_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_REF *ref, WT_PAGE *page, WT_COL *cip, + WT_ROW *rip, wt_timestamp_t rollback_timestamp, bool replace, uint64_t recno) { + WT_CELL *kcell; WT_CELL_UNPACK_KV *unpack, _unpack; WT_CURSOR *hs_cursor; WT_DECL_ITEM(hs_key); @@ -286,6 +261,7 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW wt_timestamp_t hs_durable_ts, hs_start_ts, hs_stop_durable_ts, newer_hs_durable_ts; uint64_t hs_counter, type_full; uint32_t hs_btree_id; + uint8_t *memp; uint8_t type; char ts_string[4][WT_TS_INT_STRING_SIZE]; bool valid_update_found; @@ -293,6 +269,16 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW bool first_record; #endif + /* + * Assert an exclusive or for rip and cip such that either only a cip for a column store or a + * rip for a row store are passed into the function. + */ + WT_ASSERT(session, (rip != NULL && cip == NULL) || (rip == NULL && cip != NULL)); + + if (page == NULL) { + WT_ASSERT(session, ref != NULL); + page = ref->page; + } hs_cursor = NULL; tombstone = upd = NULL; hs_durable_ts = hs_start_ts = hs_stop_durable_ts = WT_TS_NONE; @@ -304,15 +290,30 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW #endif /* Allocate buffers for the data store and history store key. */ - WT_RET(__wt_scr_alloc(session, 0, &key)); WT_ERR(__wt_scr_alloc(session, 0, &hs_key)); WT_ERR(__wt_scr_alloc(session, 0, &hs_value)); - WT_ERR(__wt_row_leaf_key(session, page, rip, key, false)); + if (rip != NULL) { + /* Unpack a row cell. */ + WT_ERR(__wt_scr_alloc(session, 0, &key)); + WT_ERR(__wt_row_leaf_key(session, page, rip, key, false)); + + /* Get the full update value from the data store. */ + unpack = &_unpack; + __wt_row_leaf_value_cell(session, page, rip, NULL, unpack); + } else { + /* Unpack a column cell. */ + WT_ERR(__wt_scr_alloc(session, WT_INTPACK64_MAXSIZE, &key)); + + /* Get the full update value from the data store. */ + unpack = &_unpack; + kcell = WT_COL_PTR(page, cip); + __wt_cell_unpack_kv(session, page->dsk, kcell, unpack); + memp = key->mem; + WT_ERR(__wt_vpack_uint(&memp, 0, recno)); + key->size = WT_PTRDIFF(memp, key->data); + } - /* Get the full update value from the data store. */ - unpack = &_unpack; - __wt_row_leaf_value_cell(session, page, rip, NULL, unpack); WT_ERR(__wt_page_cell_data_ref(session, page, unpack, &full_value)); WT_ERR(__wt_buf_set(session, &full_value, full_value.data, full_value.size)); newer_hs_durable_ts = unpack->tw.durable_start_ts; @@ -527,7 +528,10 @@ __rollback_row_ondisk_fixup_key(WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), "%p: key removed", (void *)key); } - WT_ERR(__rollback_row_add_update(session, page, rip, upd)); + if (rip != NULL) + WT_ERR(__rollback_row_modify(session, page, rip, upd)); + else + WT_ERR(__rollback_col_modify(session, ref, upd, recno)); } /* Finally remove that update from history store. */ @@ -552,11 +556,11 @@ err: } /* - * __rollback_abort_col_ondisk_kv -- - * Fix the on-disk col version according to the given timestamp. + * __rollback_abort_ondisk_kv -- + * Fix the on-disk K/V version according to the given timestamp. */ static int -__rollback_abort_col_ondisk_kv(WT_SESSION_IMPL *session, WT_REF *ref, WT_COL *cip, +__rollback_abort_ondisk_kv(WT_SESSION_IMPL *session, WT_REF *ref, WT_COL *cip, WT_ROW *rip, wt_timestamp_t rollback_timestamp, uint64_t recno) { WT_CELL *kcell; @@ -573,82 +577,19 @@ __rollback_abort_col_ondisk_kv(WT_SESSION_IMPL *session, WT_REF *ref, WT_COL *ci WT_CLEAR(buf); upd = NULL; - kcell = WT_COL_PTR(page, cip); - __wt_cell_unpack_kv(session, page->dsk, kcell, vpack); - prepared = vpack->tw.prepare; - - if (vpack->tw.durable_start_ts > rollback_timestamp || - (!WT_TIME_WINDOW_HAS_STOP(&vpack->tw) && prepared)) { - __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), - "on-disk update aborted with start durable timestamp: %s, commit timestamp: %s, " - "prepared: %s and stable timestamp: %s", - __wt_timestamp_to_string(vpack->tw.durable_start_ts, ts_string[0]), - __wt_timestamp_to_string(vpack->tw.start_ts, ts_string[1]), prepared ? "true" : "false", - __wt_timestamp_to_string(rollback_timestamp, ts_string[2])); - if (!F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) - /* Allocate tombstone and calls function add update to head of insert list. */ - return (__rollback_col_ondisk_fixup_key(session, ref, rollback_timestamp, true, recno)); - else { - /* - * In-memory database does not have a history store to provide a stable update, so - * remove the key. - */ - WT_RET(__wt_upd_alloc_tombstone(session, &upd, NULL)); - WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_removed); - } - } else if (WT_TIME_WINDOW_HAS_STOP(&vpack->tw) && - (vpack->tw.durable_stop_ts > rollback_timestamp || prepared)) { - /* - * Clear the remove operation from the key by inserting the original on-disk value as a - * standard update. - */ - WT_RET(__wt_page_cell_data_ref(session, page, vpack, &buf)); - - WT_ERR(__wt_upd_alloc(session, &buf, WT_UPDATE_STANDARD, &upd, NULL)); - upd->txnid = vpack->tw.start_txn; - upd->durable_ts = vpack->tw.durable_start_ts; - upd->start_ts = vpack->tw.start_ts; - F_SET(upd, WT_UPDATE_RESTORED_FROM_DS); - WT_STAT_CONN_DATA_INCR(session, txn_rts_keys_restored); - __wt_verbose(session, WT_VERB_RECOVERY_RTS(session), - "key restored with commit timestamp: %s, durable timestamp: %s txnid: %" PRIu64 - "and removed commit timestamp: %s, durable timestamp: %s, txnid: %" PRIu64 - ", prepared: %s", - __wt_timestamp_to_string(upd->start_ts, ts_string[0]), - __wt_timestamp_to_string(upd->durable_ts, ts_string[1]), upd->txnid, - __wt_timestamp_to_string(vpack->tw.stop_ts, ts_string[2]), - __wt_timestamp_to_string(vpack->tw.durable_stop_ts, ts_string[3]), vpack->tw.stop_txn, - prepared ? "true" : "false"); - } else - /* Stable version according to the timestamp. */ - return (0); - -err: - __wt_buf_free(session, &buf); - __wt_free(session, upd); - return (ret); -} - -/* - * __rollback_abort_row_ondisk_kv -- - * Fix the on-disk row K/V version according to the given timestamp. - */ -static int -__rollback_abort_row_ondisk_kv( - WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW *rip, wt_timestamp_t rollback_timestamp) -{ - WT_CELL_UNPACK_KV *vpack, _vpack; - WT_DECL_RET; - WT_ITEM buf; - WT_UPDATE *upd; - char ts_string[5][WT_TS_INT_STRING_SIZE]; - bool prepared; + /* + * Assert an exclusive or for rip and cip such that either only a cip for a column store or a + * rip for a row store are passed into the function. + */ + WT_ASSERT(session, (rip != NULL && cip == NULL) || (rip == NULL && cip != NULL)); - vpack = &_vpack; - WT_CLEAR(buf); - upd = NULL; + if (rip != NULL) + __wt_row_leaf_value_cell(session, page, rip, NULL, vpack); + else { + kcell = WT_COL_PTR(page, cip); + __wt_cell_unpack_kv(session, page->dsk, kcell, vpack); + } - __wt_row_leaf_value_cell(session, page, rip, NULL, vpack); prepared = vpack->tw.prepare; if (WT_IS_HS(session->dhandle)) { /* @@ -680,7 +621,8 @@ __rollback_abort_row_ondisk_kv( __wt_timestamp_to_string(vpack->tw.start_ts, ts_string[1]), prepared ? "true" : "false", __wt_timestamp_to_string(rollback_timestamp, ts_string[2]), vpack->tw.start_txn); if (!F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) - return (__rollback_row_ondisk_fixup_key(session, page, rip, rollback_timestamp, true)); + return (__rollback_ondisk_fixup_key( + session, ref, NULL, cip, rip, rollback_timestamp, true, recno)); else { /* * In-memory database don't have a history store to provide a stable update, so remove @@ -730,7 +672,10 @@ __rollback_abort_row_ondisk_kv( /* Stable version according to the timestamp. */ return (0); - WT_ERR(__rollback_row_add_update(session, page, rip, upd)); + if (rip != NULL) + WT_ERR(__rollback_row_modify(session, page, rip, upd)); + else + WT_ERR(__rollback_col_modify(session, ref, upd, recno)); upd = NULL; err: @@ -744,7 +689,7 @@ err: * Abort updates on a variable length col leaf page with timestamps newer than the rollback * timestamp. */ -static void +static int __rollback_abort_col_var(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t rollback_timestamp) { WT_CELL *kcell; @@ -753,7 +698,7 @@ __rollback_abort_col_var(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t r WT_INSERT_HEAD *ins; WT_PAGE *page; uint64_t recno, rle; - uint32_t i; + uint32_t i, j; bool stable_update_found; page = ref->page; @@ -777,7 +722,9 @@ __rollback_abort_col_var(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t r kcell = WT_COL_PTR(page, cip); __wt_cell_unpack_kv(session, page->dsk, kcell, &unpack); rle = __wt_cell_rle(&unpack); - __rollback_abort_col_ondisk_kv(session, ref, cip, rollback_timestamp, recno); + for (j = 0; j < rle; j++) + WT_RET(__rollback_abort_ondisk_kv( + session, ref, cip, NULL, rollback_timestamp, recno + j)); recno += rle; } else { recno++; @@ -787,6 +734,8 @@ __rollback_abort_col_var(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t r /* Review the append list */ if ((ins = WT_COL_APPEND(page)) != NULL) __rollback_abort_insert_list(session, ins, rollback_timestamp); + + return (0); } /* @@ -842,8 +791,8 @@ __rollback_abort_row_reconciled_page_internal(WT_SESSION_IMPL *session, const vo WT_ERR(__wt_page_inmem(session, NULL, image_local, page_flags, &mod_page)); tmp.mem = NULL; WT_ROW_FOREACH (mod_page, rip, i) - WT_ERR_NOTFOUND_OK( - __rollback_row_ondisk_fixup_key(session, mod_page, rip, rollback_timestamp, false), + WT_ERR_NOTFOUND_OK(__rollback_ondisk_fixup_key( + session, NULL, mod_page, NULL, rip, rollback_timestamp, false, 0), false); err: @@ -928,15 +877,17 @@ __rollback_abort_row_reconciled_page( * Abort updates on a row leaf page with timestamps newer than the rollback timestamp. */ static int -__rollback_abort_row_leaf( - WT_SESSION_IMPL *session, WT_PAGE *page, wt_timestamp_t rollback_timestamp) +__rollback_abort_row_leaf(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t rollback_timestamp) { WT_INSERT_HEAD *insert; + WT_PAGE *page; WT_ROW *rip; WT_UPDATE *upd; uint32_t i; bool stable_update_found; + page = ref->page; + /* * Review the insert list for keys before the first entry on the disk page. */ @@ -959,7 +910,7 @@ __rollback_abort_row_leaf( * If there is no stable update found in the update list, abort any on-disk value. */ if (!stable_update_found) - WT_RET(__rollback_abort_row_ondisk_kv(session, page, rip, rollback_timestamp)); + WT_RET(__rollback_abort_ondisk_kv(session, ref, NULL, rip, rollback_timestamp, 0)); } /* @@ -1105,7 +1056,7 @@ __rollback_abort_updates(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t r __rollback_abort_col_fix(session, page, rollback_timestamp); break; case WT_PAGE_COL_VAR: - __rollback_abort_col_var(session, ref, rollback_timestamp); + WT_RET(__rollback_abort_col_var(session, ref, rollback_timestamp)); break; case WT_PAGE_COL_INT: case WT_PAGE_ROW_INT: @@ -1116,7 +1067,7 @@ __rollback_abort_updates(WT_SESSION_IMPL *session, WT_REF *ref, wt_timestamp_t r */ break; case WT_PAGE_ROW_LEAF: - WT_RET(__rollback_abort_row_leaf(session, page, rollback_timestamp)); + WT_RET(__rollback_abort_row_leaf(session, ref, rollback_timestamp)); break; default: WT_RET(__wt_illegal_value(session, page->type)); diff --git a/src/third_party/wiredtiger/src/txn/txn_timestamp.c b/src/third_party/wiredtiger/src/txn/txn_timestamp.c index 911abe50056..6b046373187 100644 --- a/src/third_party/wiredtiger/src/txn/txn_timestamp.c +++ b/src/third_party/wiredtiger/src/txn/txn_timestamp.c @@ -69,14 +69,12 @@ __wt_txn_parse_timestamp( /* * __txn_get_read_timestamp -- - * Get the read timestamp from the transaction. Additionally return bool to specify whether the - * transaction has set the clear read queue flag. + * Get the read timestamp from the transaction. */ -static bool +static void __txn_get_read_timestamp(WT_TXN_SHARED *txn_shared, wt_timestamp_t *read_timestampp) { WT_ORDERED_READ(*read_timestampp, txn_shared->read_timestamp); - return (!txn_shared->clear_read_q); } /* @@ -88,8 +86,9 @@ __wt_txn_get_pinned_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t *tsp, uin { WT_CONNECTION_IMPL *conn; WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *txn_shared; + WT_TXN_SHARED *s; wt_timestamp_t tmp_read_ts, tmp_ts; + uint32_t i, session_cnt; bool include_oldest, txn_has_write_lock; conn = S2C(session); @@ -103,36 +102,27 @@ __wt_txn_get_pinned_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t *tsp, uin if (!txn_has_write_lock) __wt_readlock(session, &txn_global->rwlock); - tmp_ts = include_oldest ? txn_global->oldest_timestamp : 0; + tmp_ts = include_oldest ? txn_global->oldest_timestamp : WT_TS_NONE; /* Check for a running checkpoint */ if (LF_ISSET(WT_TXN_TS_INCLUDE_CKPT) && txn_global->checkpoint_timestamp != WT_TS_NONE && - (tmp_ts == 0 || txn_global->checkpoint_timestamp < tmp_ts)) + (tmp_ts == WT_TS_NONE || txn_global->checkpoint_timestamp < tmp_ts)) tmp_ts = txn_global->checkpoint_timestamp; - if (!txn_has_write_lock) - __wt_readunlock(session, &txn_global->rwlock); - /* Look for the oldest ordinary reader. */ - __wt_readlock(session, &txn_global->read_timestamp_rwlock); - TAILQ_FOREACH (txn_shared, &txn_global->read_timestamph, read_timestampq) { - /* - * Skip any transactions on the queue that are not active. Copy out value of read timestamp - * to prevent possible race where a transaction resets its read timestamp while we traverse - * the queue. - */ - if (!__txn_get_read_timestamp(txn_shared, &tmp_read_ts)) - continue; + /* Walk the array of concurrent transactions. */ + WT_ORDERED_READ(session_cnt, conn->session_cnt); + WT_STAT_CONN_INCR(session, txn_walk_sessions); + for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { + __txn_get_read_timestamp(s, &tmp_read_ts); /* * A zero timestamp is possible here only when the oldest timestamp is not accounted for. */ - if (tmp_ts == 0 || tmp_read_ts < tmp_ts) + if (tmp_ts == WT_TS_NONE || (tmp_read_ts != WT_TS_NONE && tmp_read_ts < tmp_ts)) tmp_ts = tmp_read_ts; - /* - * We break on the first active txn on the list. - */ - break; } - __wt_readunlock(session, &txn_global->read_timestamp_rwlock); + + if (!txn_has_write_lock) + __wt_readunlock(session, &txn_global->rwlock); if (!include_oldest && tmp_ts == 0) return (WT_NOTFOUND); @@ -143,14 +133,12 @@ __wt_txn_get_pinned_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t *tsp, uin /* * __txn_get_durable_timestamp -- - * Get the durable timestamp from the transaction. Additionally return bool to specify whether - * the transaction has set the clear durable queue flag. + * Get the durable timestamp from the transaction. */ -static bool +static void __txn_get_durable_timestamp(WT_TXN_SHARED *txn_shared, wt_timestamp_t *durable_timestampp) { WT_ORDERED_READ(*durable_timestampp, txn_shared->pinned_durable_timestamp); - return (!txn_shared->clear_durable_q); } /* @@ -163,8 +151,9 @@ __txn_global_query_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t *tsp, cons WT_CONFIG_ITEM cval; WT_CONNECTION_IMPL *conn; WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *txn_shared; + WT_TXN_SHARED *s; wt_timestamp_t ts, tmpts; + uint32_t i, session_cnt; conn = S2C(session); txn_global = &conn->txn_global; @@ -177,25 +166,18 @@ __txn_global_query_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t *tsp, cons ts = txn_global->durable_timestamp; WT_ASSERT(session, ts != WT_TS_NONE); - /* - * Skip straight to the commit queue if no running transactions have an explicit durable - * timestamp. - */ - if (TAILQ_EMPTY(&txn_global->durable_timestamph)) - goto done; - /* - * Compare with the least recently durable transaction. - */ - __wt_readlock(session, &txn_global->durable_timestamp_rwlock); - TAILQ_FOREACH (txn_shared, &txn_global->durable_timestamph, durable_timestampq) { - if (__txn_get_durable_timestamp(txn_shared, &tmpts)) { - --tmpts; - if (tmpts < ts) - ts = tmpts; - break; - } + __wt_readlock(session, &txn_global->rwlock); + + /* Walk the array of concurrent transactions. */ + WT_ORDERED_READ(session_cnt, conn->session_cnt); + WT_STAT_CONN_INCR(session, txn_walk_sessions); + for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { + __txn_get_durable_timestamp(s, &tmpts); + if (tmpts != WT_TS_NONE && --tmpts < ts) + ts = tmpts; } - __wt_readunlock(session, &txn_global->durable_timestamp_rwlock); + + __wt_readunlock(session, &txn_global->rwlock); /* * If a transaction is committing with a durable timestamp of 1, we could return zero here, @@ -225,7 +207,6 @@ __txn_global_query_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t *tsp, cons } else WT_RET_MSG(session, EINVAL, "unknown timestamp query %.*s", (int)cval.len, cval.str); -done: *tsp = ts; return (0); } @@ -507,49 +488,36 @@ set: * if any. */ static int -__txn_assert_after_reads( - WT_SESSION_IMPL *session, const char *op, wt_timestamp_t ts, WT_TXN_SHARED **prev_sharedp) +__txn_assert_after_reads(WT_SESSION_IMPL *session, const char *op, wt_timestamp_t ts) { #ifdef HAVE_DIAGNOSTIC WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *prev_shared, *txn_shared; + WT_TXN_SHARED *s; wt_timestamp_t tmp_timestamp; + uint32_t i, session_cnt; char ts_string[2][WT_TS_INT_STRING_SIZE]; txn_global = &S2C(session)->txn_global; - txn_shared = WT_SESSION_TXN_SHARED(session); - - __wt_readlock(session, &txn_global->read_timestamp_rwlock); - prev_shared = TAILQ_LAST(&txn_global->read_timestamph, __wt_txn_rts_qh); - while (prev_shared != NULL) { - /* - * Skip self and non-active transactions. Copy out value of read timestamp to prevent - * possible race where a transaction resets its read timestamp while we traverse the queue. - */ - if (!__txn_get_read_timestamp(prev_shared, &tmp_timestamp) || prev_shared == txn_shared) { - prev_shared = TAILQ_PREV(prev_shared, __wt_txn_rts_qh, read_timestampq); - continue; - } - if (tmp_timestamp >= ts) { - __wt_readunlock(session, &txn_global->read_timestamp_rwlock); + __wt_readlock(session, &txn_global->rwlock); + /* Walk the array of concurrent transactions. */ + WT_ORDERED_READ(session_cnt, S2C(session)->session_cnt); + WT_STAT_CONN_INCR(session, txn_walk_sessions); + for (i = 0, s = txn_global->txn_shared_list; i < session_cnt; i++, s++) { + __txn_get_read_timestamp(s, &tmp_timestamp); + if (tmp_timestamp != WT_TS_NONE && tmp_timestamp >= ts) { + __wt_readunlock(session, &txn_global->rwlock); WT_RET_MSG(session, EINVAL, "%s timestamp %s must be greater than the latest active read timestamp %s ", op, __wt_timestamp_to_string(ts, ts_string[0]), __wt_timestamp_to_string(tmp_timestamp, ts_string[1])); } - break; } - - __wt_readunlock(session, &txn_global->read_timestamp_rwlock); - - if (prev_sharedp != NULL) - *prev_sharedp = prev_shared; + __wt_readunlock(session, &txn_global->rwlock); #else WT_UNUSED(session); WT_UNUSED(op); WT_UNUSED(ts); - WT_UNUSED(prev_sharedp); #endif return (0); @@ -617,7 +585,7 @@ __wt_txn_set_commit_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t commit_ts __wt_timestamp_to_string(commit_ts, ts_string[0]), __wt_timestamp_to_string(txn->first_commit_timestamp, ts_string[1])); - WT_RET(__txn_assert_after_reads(session, "commit", commit_ts, NULL)); + WT_RET(__txn_assert_after_reads(session, "commit", commit_ts)); } else { /* * For a prepared transaction, the commit timestamp should not be less than the prepare @@ -727,13 +695,11 @@ __wt_txn_set_prepare_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t prepare_ { WT_TXN *txn; WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *prev_shared; wt_timestamp_t oldest_ts; char ts_string[2][WT_TS_INT_STRING_SIZE]; txn = session->txn; txn_global = &S2C(session)->txn_global; - prev_shared = WT_SESSION_TXN_SHARED(session); WT_RET(__wt_txn_context_prepare_check(session)); @@ -744,7 +710,7 @@ __wt_txn_set_prepare_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t prepare_ WT_RET_MSG(session, EINVAL, "commit timestamp should not have been set before the prepare timestamp"); - WT_RET(__txn_assert_after_reads(session, "prepare", prepare_ts, &prev_shared)); + WT_RET(__txn_assert_after_reads(session, "prepare", prepare_ts)); /* * Check whether the prepare timestamp is less than the oldest timestamp. @@ -755,12 +721,6 @@ __wt_txn_set_prepare_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t prepare_ * Check whether the prepare timestamp needs to be rounded up to the oldest timestamp. */ if (F_ISSET(txn, WT_TXN_TS_ROUND_PREPARED)) { - /* - * Check that there are no active readers. That would be a violation of preconditions - * for rounding timestamps of prepared transactions. - */ - WT_ASSERT(session, prev_shared == NULL); - __wt_verbose(session, WT_VERB_TIMESTAMP, "prepare timestamp %s rounded to oldest timestamp %s", __wt_timestamp_to_string(prepare_ts, ts_string[0]), @@ -854,7 +814,7 @@ __wt_txn_set_read_timestamp(WT_SESSION_IMPL *session, wt_timestamp_t read_ts) } else txn_shared->read_timestamp = read_ts; - __wt_txn_publish_read_timestamp(session); + F_SET(txn, WT_TXN_SHARED_TS_READ); __wt_readunlock(session, &txn_global->rwlock); /* @@ -941,13 +901,10 @@ void __wt_txn_publish_durable_timestamp(WT_SESSION_IMPL *session) { WT_TXN *txn; - WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *qtxn_shared, *txn_shared, *txn_shared_tmp; - wt_timestamp_t tmpts, ts; - uint64_t walked; + WT_TXN_SHARED *txn_shared; + wt_timestamp_t ts; txn = session->txn; - txn_global = &S2C(session)->txn_global; txn_shared = WT_SESSION_TXN_SHARED(session); if (F_ISSET(txn, WT_TXN_SHARED_TS_DURABLE)) @@ -968,64 +925,7 @@ __wt_txn_publish_durable_timestamp(WT_SESSION_IMPL *session) } else return; - __wt_writelock(session, &txn_global->durable_timestamp_rwlock); - /* - * If our transaction is on the queue remove it first. The timestamp may move earlier so we - * otherwise might not remove ourselves before finding where to insert ourselves (which would - * result in a list loop) and we don't want to walk more of the list than needed. - */ - if (txn_shared->clear_durable_q) { - TAILQ_REMOVE(&txn_global->durable_timestamph, txn_shared, durable_timestampq); - txn_shared->clear_durable_q = false; - --txn_global->durable_timestampq_len; - } - /* - * Walk the list to look for where to insert our own transaction and remove any transactions - * that are not active. We stop when we get to the location where we want to insert. - */ - if (TAILQ_EMPTY(&txn_global->durable_timestamph)) { - TAILQ_INSERT_HEAD(&txn_global->durable_timestamph, txn_shared, durable_timestampq); - WT_STAT_CONN_INCR(session, txn_durable_queue_empty); - } else { - /* Walk from the start, removing cleared entries. */ - walked = 0; - TAILQ_FOREACH_SAFE( - qtxn_shared, &txn_global->durable_timestamph, durable_timestampq, txn_shared_tmp) - { - ++walked; - /* - * Stop on the first entry that we cannot clear. - */ - if (!qtxn_shared->clear_durable_q) - break; - - TAILQ_REMOVE(&txn_global->durable_timestamph, qtxn_shared, durable_timestampq); - qtxn_shared->clear_durable_q = false; - --txn_global->durable_timestampq_len; - } - - /* - * Now walk backwards from the end to find the correct position for the insert. - */ - qtxn_shared = TAILQ_LAST(&txn_global->durable_timestamph, __wt_txn_dts_qh); - while (qtxn_shared != NULL && - (!__txn_get_durable_timestamp(qtxn_shared, &tmpts) || tmpts > ts)) { - ++walked; - qtxn_shared = TAILQ_PREV(qtxn_shared, __wt_txn_dts_qh, durable_timestampq); - } - if (qtxn_shared == NULL) { - TAILQ_INSERT_HEAD(&txn_global->durable_timestamph, txn_shared, durable_timestampq); - WT_STAT_CONN_INCR(session, txn_durable_queue_head); - } else - TAILQ_INSERT_AFTER( - &txn_global->durable_timestamph, qtxn_shared, txn_shared, durable_timestampq); - WT_STAT_CONN_INCRV(session, txn_durable_queue_walked, walked); - } - ++txn_global->durable_timestampq_len; - WT_STAT_CONN_INCR(session, txn_durable_queue_inserts); txn_shared->pinned_durable_timestamp = ts; - txn_shared->clear_durable_q = false; - __wt_writeunlock(session, &txn_global->durable_timestamp_rwlock); F_SET(txn, WT_TXN_SHARED_TS_DURABLE); } @@ -1045,98 +945,9 @@ __wt_txn_clear_durable_timestamp(WT_SESSION_IMPL *session) if (!F_ISSET(txn, WT_TXN_SHARED_TS_DURABLE)) return; - /* - * Notify other threads that our transaction is inactive and can be cleaned up safely from the - * durable timestamp queue whenever the next thread walks the queue. We do not need to remove it - * now. - */ - txn_shared->clear_durable_q = true; WT_WRITE_BARRIER(); F_CLR(txn, WT_TXN_SHARED_TS_DURABLE); -} - -/* - * __wt_txn_publish_read_timestamp -- - * Publish a transaction's read timestamp. - */ -void -__wt_txn_publish_read_timestamp(WT_SESSION_IMPL *session) -{ - WT_TXN *txn; - WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *qtxn_shared, *txn_shared, *txn_shared_tmp; - wt_timestamp_t tmp_timestamp; - uint64_t walked; - - txn = session->txn; - txn_global = &S2C(session)->txn_global; - txn_shared = WT_SESSION_TXN_SHARED(session); - - if (F_ISSET(txn, WT_TXN_SHARED_TS_READ)) - return; - - __wt_writelock(session, &txn_global->read_timestamp_rwlock); - /* - * If our transaction is on the queue remove it first. The timestamp may move earlier so we - * otherwise might not remove ourselves before finding where to insert ourselves (which would - * result in a list loop) and we don't want to walk more of the list than needed. - */ - if (txn_shared->clear_read_q) { - TAILQ_REMOVE(&txn_global->read_timestamph, txn_shared, read_timestampq); - WT_PUBLISH(txn_shared->clear_read_q, false); - --txn_global->read_timestampq_len; - } - /* - * Walk the list to look for where to insert our own transaction and remove any transactions - * that are not active. We stop when we get to the location where we want to insert. - */ - if (TAILQ_EMPTY(&txn_global->read_timestamph)) { - TAILQ_INSERT_HEAD(&txn_global->read_timestamph, txn_shared, read_timestampq); - WT_STAT_CONN_INCR(session, txn_read_queue_empty); - } else { - /* Walk from the start, removing cleared entries. */ - walked = 0; - TAILQ_FOREACH_SAFE( - qtxn_shared, &txn_global->read_timestamph, read_timestampq, txn_shared_tmp) - { - ++walked; - if (!qtxn_shared->clear_read_q) - break; - - TAILQ_REMOVE(&txn_global->read_timestamph, qtxn_shared, read_timestampq); - WT_PUBLISH(qtxn_shared->clear_read_q, false); - --txn_global->read_timestampq_len; - } - - /* - * Now walk backwards from the end to find the correct position for the insert. - */ - qtxn_shared = TAILQ_LAST(&txn_global->read_timestamph, __wt_txn_rts_qh); - while (qtxn_shared != NULL) { - if (!__txn_get_read_timestamp(qtxn_shared, &tmp_timestamp) || - tmp_timestamp > txn_shared->read_timestamp) { - ++walked; - qtxn_shared = TAILQ_PREV(qtxn_shared, __wt_txn_rts_qh, read_timestampq); - } else - break; - } - if (qtxn_shared == NULL) { - TAILQ_INSERT_HEAD(&txn_global->read_timestamph, txn_shared, read_timestampq); - WT_STAT_CONN_INCR(session, txn_read_queue_head); - } else - TAILQ_INSERT_AFTER( - &txn_global->read_timestamph, qtxn_shared, txn_shared, read_timestampq); - WT_STAT_CONN_INCRV(session, txn_read_queue_walked, walked); - } - /* - * We do not set the read timestamp here. It has been set in the caller because special - * processing for round to oldest. - */ - ++txn_global->read_timestampq_len; - WT_STAT_CONN_INCR(session, txn_read_queue_inserts); - txn_shared->clear_read_q = false; - F_SET(txn, WT_TXN_SHARED_TS_READ); - __wt_writeunlock(session, &txn_global->read_timestamp_rwlock); + txn_shared->pinned_durable_timestamp = WT_TS_NONE; } /* @@ -1156,62 +967,8 @@ __wt_txn_clear_read_timestamp(WT_SESSION_IMPL *session) /* Assert the read timestamp is greater than or equal to the pinned timestamp. */ WT_ASSERT(session, txn_shared->read_timestamp >= S2C(session)->txn_global.pinned_timestamp); - /* - * Notify other threads that our transaction is inactive and can be cleaned up safely from - * the read timestamp queue whenever the next thread walks the queue. We do not need to - * remove it now. - */ - txn_shared->clear_read_q = true; WT_WRITE_BARRIER(); - F_CLR(txn, WT_TXN_SHARED_TS_READ); } txn_shared->read_timestamp = WT_TS_NONE; } - -/* - * __wt_txn_clear_timestamp_queues -- - * We're about to clear the session and overwrite the txn structure. Remove ourselves from the - * commit timestamp queue and the read timestamp queue if we're on either of them. - */ -void -__wt_txn_clear_timestamp_queues(WT_SESSION_IMPL *session) -{ - WT_TXN_GLOBAL *txn_global; - WT_TXN_SHARED *txn_shared; - - txn_shared = WT_SESSION_TXN_SHARED(session); - txn_global = &S2C(session)->txn_global; - - /* - * If we've closed the connection, our transaction shared states may already have been freed. In - * that case, there's nothing more to do here. - */ - if (txn_shared == NULL || (!txn_shared->clear_durable_q && !txn_shared->clear_read_q)) - return; - - if (txn_shared->clear_durable_q) { - __wt_writelock(session, &txn_global->durable_timestamp_rwlock); - /* - * Recheck after acquiring the lock. - */ - if (txn_shared->clear_durable_q) { - TAILQ_REMOVE(&txn_global->durable_timestamph, txn_shared, durable_timestampq); - --txn_global->durable_timestampq_len; - txn_shared->clear_durable_q = false; - } - __wt_writeunlock(session, &txn_global->durable_timestamp_rwlock); - } - if (txn_shared->clear_read_q) { - __wt_writelock(session, &txn_global->read_timestamp_rwlock); - /* - * Recheck after acquiring the lock. - */ - if (txn_shared->clear_read_q) { - TAILQ_REMOVE(&txn_global->read_timestamph, txn_shared, read_timestampq); - --txn_global->read_timestampq_len; - txn_shared->clear_read_q = false; - } - __wt_writeunlock(session, &txn_global->read_timestamp_rwlock); - } -} diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h index 6c9b9734e6c..61f61c9a838 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/api_const.h @@ -32,13 +32,26 @@ /* Define all constants related to WiredTiger APIs and testing. */ namespace test_harness { -static const char *CONNECTION_CREATE = "create"; +/* Configuration API consts. */ +static const char *CACHE_SIZE_MB = "cache_size_mb"; static const char *COLLECTION_COUNT = "collection_count"; static const char *DURATION_SECONDS = "duration_seconds"; +static const char *ENABLE_TRACKING = "enable_tracking"; +static const char *ENABLED = "enabled"; static const char *KEY_COUNT = "key_count"; +static const char *LIMIT = "limit"; +static const char *RATE_PER_SECOND = "rate_per_second"; static const char *READ_THREADS = "read_threads"; +static const char *STAT_CACHE_SIZE = "stat_cache_size"; static const char *VALUE_SIZE = "value_size"; -static const char *TRACKING_COLLECTION = "table:tracking"; + +/* WiredTiger API consts. */ +static const char *CONNECTION_CREATE = "create"; + +/* Test harness consts. */ +static const char *TABLE_OPERATION_TRACKING = "table:operation_tracking"; +static const char *TABLE_SCHEMA_TRACKING = "table:schema_tracking"; +static const char *STATISTICS_URI = "statistics:"; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/component.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/component.h index 91d695cbab2..4d4e08164a6 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/component.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/component.h @@ -29,6 +29,8 @@ #ifndef COMPONENT_H #define COMPONENT_H +#include "configuration.h" + namespace test_harness { /* * A component is a class that defines 3 unique stages in its life-cycle, the stages must be run in @@ -36,6 +38,7 @@ namespace test_harness { */ class component { public: + component(configuration *config) : _config(config) {} /* * The load function should perform all tasks required to setup the component for the main phase * of the test. An example operation performed in the load phase would be populating a database. @@ -64,6 +67,7 @@ class component { protected: volatile bool _running; + configuration *_config; }; } // namespace test_harness #endif diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration_settings.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h index b0c40e3a6e3..b0c40e3a6e3 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration_settings.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/configuration.h diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h index 967a3e5cad7..f4d50e4778b 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/connection_manager.h @@ -67,7 +67,7 @@ class connection_manager { } void - create(std::string home = DEFAULT_DIR) + create(const std::string &config, const std::string &home = DEFAULT_DIR) { if (_conn != nullptr) { debug_info("connection is not NULL, cannot be re-opened.", _trace_level, DEBUG_ERROR); @@ -78,7 +78,7 @@ class connection_manager { testutil_make_work_dir(home.c_str()); /* Open conn. */ - testutil_check(wiredtiger_open(home.c_str(), NULL, CONNECTION_CREATE, &_conn)); + testutil_check(wiredtiger_open(home.c_str(), NULL, config.c_str(), &_conn)); } WT_SESSION * diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/debug_utils.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/debug_utils.h index 62a90aec1a6..0106dd1b05c 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/debug_utils.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/debug_utils.h @@ -35,6 +35,7 @@ namespace test_harness { #define DEBUG_ABORT -1 #define DEBUG_ERROR 0 #define DEBUG_INFO 1 +#define DEBUG_TRACE 2 static int64_t _trace_level = 0; diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h index d47c7af7d86..cee7d3185eb 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/runtime_monitor.h @@ -29,20 +29,143 @@ #ifndef RUNTIME_MONITOR_H #define RUNTIME_MONITOR_H +#include <thread> + +extern "C" { +#include "wiredtiger.h" +} + +#include "api_const.h" +#include "component.h" +#include "connection_manager.h" +#include "debug_utils.h" + namespace test_harness { +/* Static statistic get function. */ +static void +get_stat(WT_CURSOR *cursor, int stat_field, int64_t *valuep) +{ + const char *desc, *pvalue; + cursor->set_key(cursor, stat_field); + testutil_check(cursor->search(cursor)); + testutil_check(cursor->get_value(cursor, &desc, &pvalue, valuep)); +} + +class statistic { + public: + statistic(configuration *config) + { + testutil_assert(config != nullptr); + testutil_check(config->get_bool(ENABLED, _enabled)); + } + + /* Check that the given statistic is within bounds. */ + virtual void check(WT_CURSOR *cursor) = 0; + + /* Suppress warning about destructor being non-virtual. */ + virtual ~statistic() {} + + bool + is_enabled() const + { + return _enabled; + } + + protected: + bool _enabled = false; +}; + +class cache_limit_statistic : public statistic { + public: + cache_limit_statistic(configuration *config) : statistic(config) + { + testutil_check(config->get_int(LIMIT, limit)); + } + + void + check(WT_CURSOR *cursor) + { + testutil_assert(cursor != nullptr); + int64_t cache_bytes_image, cache_bytes_other, cache_bytes_max; + double use_percent; + /* Three statistics are required to compute cache use percentage. */ + get_stat(cursor, WT_STAT_CONN_CACHE_BYTES_IMAGE, &cache_bytes_image); + get_stat(cursor, WT_STAT_CONN_CACHE_BYTES_OTHER, &cache_bytes_other); + get_stat(cursor, WT_STAT_CONN_CACHE_BYTES_MAX, &cache_bytes_max); + /* + * Assert that we never exceed our configured limit for cache usage. Add 0.0 to avoid + * floating point conversion errors. + */ + use_percent = ((cache_bytes_image + cache_bytes_other + 0.0) / cache_bytes_max) * 100; + if (use_percent > limit) { + std::string error_string = + "runtime_monitor: Cache usage exceeded during test! Limit: " + std::to_string(limit) + + " usage: " + std::to_string(use_percent); + debug_info(error_string, _trace_level, DEBUG_ERROR); + testutil_assert(use_percent < limit); + } else + debug_info("Usage: " + std::to_string(use_percent), _trace_level, DEBUG_TRACE); + } + + private: + int64_t limit; +}; + /* * The runtime monitor class is designed to track various statistics or other runtime signals * relevant to the given workload. */ class runtime_monitor : public component { public: + runtime_monitor(configuration *config) : component(config) {} + + ~runtime_monitor() + { + for (auto &it : _stats) + delete it; + _stats.clear(); + } + + /* Delete copy constructor. */ + runtime_monitor(const runtime_monitor &) = delete; + + void + load() + { + WT_CONFIG_ITEM nested; + std::string statistic_list; + /* Parse the configuration for the runtime monitor. */ + testutil_check(_config->get_int(RATE_PER_SECOND, _ops)); + + /* Load known statistics. */ + testutil_check(_config->get(STAT_CACHE_SIZE, &nested)); + configuration sub_config = configuration(nested); + _stats.push_back(new cache_limit_statistic(&sub_config)); + _running = true; + } + void run() { + WT_SESSION *session = connection_manager::instance().create_session(); + WT_CURSOR *cursor = nullptr; + + /* Open a statistics cursor. */ + testutil_check(session->open_cursor(session, STATISTICS_URI, nullptr, nullptr, &cursor)); + while (_running) { - /* Do something. */ + /* Sleep so that we do x operations per second. To be replaced by throttles. */ + std::this_thread::sleep_for(std::chrono::milliseconds(1000 / _ops)); + for (const auto &it : _stats) { + if (it->is_enabled()) + it->check(cursor); + } } } + + private: + int64_t _ops; + std::vector<statistic *> _stats; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h index 92fcd2aefb2..f5cf0b8a6d9 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/test.h @@ -40,12 +40,13 @@ extern "C" { #include "api_const.h" #include "component.h" -#include "configuration_settings.h" +#include "configuration.h" #include "connection_manager.h" #include "runtime_monitor.h" #include "timestamp_manager.h" #include "thread_manager.h" #include "workload_generator.h" +#include "workload_validation.h" namespace test_harness { /* @@ -53,12 +54,12 @@ namespace test_harness { */ class test { public: - test(const std::string &config, bool enable_tracking) + test(const std::string &config) { _configuration = new configuration(name, config); - _workload_generator = new workload_generator(_configuration, enable_tracking); - _runtime_monitor = new runtime_monitor(); - _timestamp_manager = new timestamp_manager(); + _workload_generator = new workload_generator(_configuration); + _runtime_monitor = new runtime_monitor(_configuration); + _timestamp_manager = new timestamp_manager(_configuration); _thread_manager = new thread_manager(); /* * Ordering is not important here, any dependencies between components should be resolved @@ -74,11 +75,13 @@ class test { delete _timestamp_manager; delete _thread_manager; delete _workload_generator; + delete _workload_tracking; _configuration = nullptr; _runtime_monitor = nullptr; _timestamp_manager = nullptr; _thread_manager = nullptr; _workload_generator = nullptr; + _workload_tracking = nullptr; _components.clear(); } @@ -89,30 +92,61 @@ class test { void run() { + int64_t cache_size_mb = 100; int64_t duration_seconds = 0; + bool enable_tracking = false, is_success = true; + + /* Build the database creation config string. */ + std::string db_create_config = CONNECTION_CREATE; + + testutil_check(_configuration->get_int(CACHE_SIZE_MB, cache_size_mb)); + db_create_config += ",statistics=(fast),cache_size=" + std::to_string(cache_size_mb) + "MB"; /* Set up the test environment. */ - connection_manager::instance().create(); + connection_manager::instance().create(db_create_config); + + /* Create the activity tracker if required. */ + testutil_check(_configuration->get_bool(ENABLE_TRACKING, enable_tracking)); + if (enable_tracking) { + _workload_tracking = + new workload_tracking(_configuration, OPERATION_TRACKING_TABLE_CONFIG, + TABLE_OPERATION_TRACKING, SCHEMA_TRACKING_TABLE_CONFIG, TABLE_SCHEMA_TRACKING); + /* Make sure the tracking component is loaded first to track all activities. */ + _components.insert(_components.begin(), _workload_tracking); + } else + _workload_tracking = nullptr; + /* Tell the workload generator whether tracking is enabled. */ + _workload_generator->set_tracker(_workload_tracking); /* Initiate the load stage of each component. */ - for (const auto &it : _components) { + for (const auto &it : _components) it->load(); - } /* Spawn threads for all component::run() functions. */ - for (const auto &it : _components) { + for (const auto &it : _components) _thread_manager->add_thread(&component::run, it); - } /* Sleep duration seconds. */ testutil_check(_configuration->get_int(DURATION_SECONDS, duration_seconds)); std::this_thread::sleep_for(std::chrono::seconds(duration_seconds)); /* End the test. */ - for (const auto &it : _components) { + for (const auto &it : _components) it->finish(); - } _thread_manager->join(); + + /* Validation stage. */ + if (enable_tracking) { + workload_validation wv; + is_success = wv.validate(_workload_tracking->get_operation_table_name(), + _workload_tracking->get_schema_table_name()); + } + + if (is_success) + std::cout << "SUCCESS" << std::endl; + else + std::cout << "FAILED" << std::endl; + connection_manager::instance().close(); } @@ -154,6 +188,7 @@ class test { timestamp_manager *_timestamp_manager; thread_manager *_thread_manager; workload_generator *_workload_generator; + workload_tracking *_workload_tracking; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h index 4d6d1e56a68..10aa0482bf4 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/timestamp_manager.h @@ -29,6 +29,8 @@ #ifndef TIMESTAMP_MANAGER_H #define TIMESTAMP_MANAGER_H +#include "component.h" + namespace test_harness { /* * The timestamp monitor class manages global timestamp state for all components in the test @@ -36,6 +38,8 @@ namespace test_harness { */ class timestamp_manager : public component { public: + timestamp_manager(configuration *config) : component(config) {} + void run() { diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h index eb9831c5945..3eeebb40c1e 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_generator.h @@ -29,6 +29,9 @@ #ifndef WORKLOAD_GENERATOR_H #define WORKLOAD_GENERATOR_H +#include <algorithm> +#include <map> + #include "random_generator.h" #include "workload_tracking.h" @@ -38,14 +41,13 @@ namespace test_harness { */ class workload_generator : public component { public: - workload_generator(configuration *configuration, bool enable_tracking) - : _configuration(configuration), _enable_tracking(enable_tracking) + workload_generator(configuration *configuration) + : component(configuration), _enable_tracking(false), _workload_tracking(nullptr) { } ~workload_generator() { - delete _workload_tracking; for (auto &it : _workers) delete it; } @@ -73,31 +75,24 @@ class workload_generator : public component { collection_count = key_count = value_size = 0; collection_name = ""; - /* Create the activity tracker if required. */ - if (_enable_tracking) { - _workload_tracking = new workload_tracking(TRACKING_COLLECTION); - _workload_tracking->load(); - } - /* Get a session. */ session = connection_manager::instance().create_session(); - /* Create n collections as per the configuration and store each collection name. */ - testutil_check(_configuration->get_int(COLLECTION_COUNT, collection_count)); + testutil_check(_config->get_int(COLLECTION_COUNT, collection_count)); for (int i = 0; i < collection_count; ++i) { collection_name = "table:collection" + std::to_string(i); testutil_check(session->create(session, collection_name.c_str(), DEFAULT_TABLE_SCHEMA)); if (_enable_tracking) testutil_check( - _workload_tracking->save(tracking_operation::CREATE, collection_name, "", "")); + _workload_tracking->save(tracking_operation::CREATE, collection_name, 0, "")); _collection_names.push_back(collection_name); } debug_info( std::to_string(collection_count) + " collections created", _trace_level, DEBUG_INFO); /* Open a cursor on each collection and use the configuration to insert key/value pairs. */ - testutil_check(_configuration->get_int(KEY_COUNT, key_count)); - testutil_check(_configuration->get_int(VALUE_SIZE, value_size)); + testutil_check(_config->get_int(KEY_COUNT, key_count)); + testutil_check(_config->get_int(VALUE_SIZE, value_size)); for (const auto &collection_name : _collection_names) { /* WiredTiger lets you open a cursor on a collection using the same pointer. When a * session is closed, WiredTiger APIs close the cursors too. */ @@ -108,8 +103,8 @@ class workload_generator : public component { * configuration. */ std::string generated_value = random_generator::random_generator::instance().generate_string(value_size); - testutil_check( - insert(cursor, collection_name, j, generated_value.c_str(), _enable_tracking)); + testutil_check(insert( + cursor, collection_name, j + 1, generated_value.c_str(), _enable_tracking)); } } debug_info("Load stage done", _trace_level, DEBUG_INFO); @@ -125,8 +120,8 @@ class workload_generator : public component { session = nullptr; duration_seconds = read_threads = 0; - testutil_check(_configuration->get_int(DURATION_SECONDS, duration_seconds)); - testutil_check(_configuration->get_int(READ_THREADS, read_threads)); + testutil_check(_config->get_int(DURATION_SECONDS, duration_seconds)); + testutil_check(_config->get_int(READ_THREADS, read_threads)); /* Generate threads to execute read operations on the collections. */ for (int i = 0; i < read_threads; ++i) { thread_context *tc = new thread_context(_collection_names, thread_operation::READ); @@ -145,6 +140,15 @@ class workload_generator : public component { _thread_manager.join(); } + void + set_tracker(workload_tracking *tracking) + { + /* Tracking cannot be NULL. */ + testutil_check(tracking == nullptr); + _enable_tracking = true; + _workload_tracking = tracking; + } + /* Workload threaded operations. */ static void execute_operation(thread_context &context) @@ -166,7 +170,7 @@ class workload_generator : public component { break; default: testutil_die(DEBUG_ABORT, "system: thread_operation is unknown : %d", - static_cast<int>(thread_operation::UPDATE)); + static_cast<int>(context.get_thread_operation())); break; } } @@ -245,8 +249,7 @@ class workload_generator : public component { private: std::vector<std::string> _collection_names; - configuration *_configuration = nullptr; - bool _enable_tracking = false; + bool _enable_tracking; thread_manager _thread_manager; std::vector<thread_context *> _workers; workload_tracking *_workload_tracking; diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h index 6d6d0aac248..c60ecbae619 100644 --- a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_tracking.h @@ -30,39 +30,77 @@ #define WORKLOAD_TRACKING_H /* - * Default schema for tracking table key_format : Collection name / Key value_format : Operation - * type / Value / Timestamp + * Default schema for tracking operations on collections (key_format: Collection name / Key / + * Timestamp, value_format: Operation type / Value) */ -#define DEFAULT_TRACKING_KEY_FORMAT WT_UNCHECKED_STRING(Si) -#define DEFAULT_TRACKING_VALUE_FORMAT WT_UNCHECKED_STRING(iSi) -#define DEFAULT_TRACKING_TABLE_SCHEMA \ - "key_format=" DEFAULT_TRACKING_KEY_FORMAT ",value_format=" DEFAULT_TRACKING_VALUE_FORMAT +#define OPERATION_TRACKING_KEY_FORMAT WT_UNCHECKED_STRING(Sii) +#define OPERATION_TRACKING_VALUE_FORMAT WT_UNCHECKED_STRING(iS) +#define OPERATION_TRACKING_TABLE_CONFIG \ + "key_format=" OPERATION_TRACKING_KEY_FORMAT ",value_format=" OPERATION_TRACKING_VALUE_FORMAT + +/* + * Default schema for tracking schema operations on collections (key_format: Collection name / + * Timestamp, value_format: Operation type) + */ +#define SCHEMA_TRACKING_KEY_FORMAT WT_UNCHECKED_STRING(Si) +#define SCHEMA_TRACKING_VALUE_FORMAT WT_UNCHECKED_STRING(i) +#define SCHEMA_TRACKING_TABLE_CONFIG \ + "key_format=" SCHEMA_TRACKING_KEY_FORMAT ",value_format=" SCHEMA_TRACKING_VALUE_FORMAT namespace test_harness { /* Tracking operations. */ -enum class tracking_operation { CREATE, INSERT }; +enum class tracking_operation { CREATE, DELETE_COLLECTION, DELETE_KEY, INSERT }; /* Class used to track operations performed on collections */ -class workload_tracking { +class workload_tracking : public component { public: - workload_tracking(const std::string &collection_name) - : _collection_name(collection_name), _timestamp(0U) + workload_tracking(configuration *_config, const std::string &operation_table_config, + const std::string &operation_table_name, const std::string &schema_table_config, + const std::string &schema_table_name) + : component(_config), _cursor_operations(nullptr), _cursor_schema(nullptr), + _operation_table_config(operation_table_config), + _operation_table_name(operation_table_name), _schema_table_config(schema_table_config), + _schema_table_name(schema_table_name), _timestamp(0U) { } - int - load(const std::string &table_schema = DEFAULT_TRACKING_TABLE_SCHEMA) + const std::string & + get_schema_table_name() const + { + return _schema_table_name; + } + + const std::string & + get_operation_table_name() const + { + return _operation_table_name; + } + + void + load() { WT_SESSION *session; - /* Create tracking collection. */ + /* Initiate schema tracking. */ session = connection_manager::instance().create_session(); - testutil_check(session->create(session, _collection_name.c_str(), table_schema.c_str())); testutil_check( - session->open_cursor(session, _collection_name.c_str(), NULL, NULL, &_cursor)); - debug_info("Tracking collection created", _trace_level, DEBUG_INFO); + session->create(session, _schema_table_name.c_str(), _schema_table_config.c_str())); + testutil_check( + session->open_cursor(session, _schema_table_name.c_str(), NULL, NULL, &_cursor_schema)); + debug_info("Schema tracking initiated", _trace_level, DEBUG_INFO); - return (0); + /* Initiate operations tracking. */ + testutil_check( + session->create(session, _operation_table_name.c_str(), _operation_table_config.c_str())); + testutil_check(session->open_cursor( + session, _operation_table_name.c_str(), NULL, NULL, &_cursor_operations)); + debug_info("Operations tracking created", _trace_level, DEBUG_INFO); + } + + void + run() + { + /* Does not do anything. */ } template <typename K, typename V> @@ -70,24 +108,42 @@ class workload_tracking { save(const tracking_operation &operation, const std::string &collection_name, const K &key, const V &value) { + WT_CURSOR *cursor; int error_code; - _cursor->set_key(_cursor, collection_name.c_str(), key); - _cursor->set_value(_cursor, static_cast<int>(operation), value, _timestamp++); - error_code = _cursor->insert(_cursor); + /* Select the correct cursor to save in the collection associated to specific operations. */ + switch (operation) { + case tracking_operation::CREATE: + case tracking_operation::DELETE_COLLECTION: + cursor = _cursor_schema; + cursor->set_key(cursor, collection_name.c_str(), _timestamp++); + cursor->set_value(cursor, static_cast<int>(operation)); + break; - if (error_code == 0) { + default: + cursor = _cursor_operations; + cursor->set_key(cursor, collection_name.c_str(), key, _timestamp++); + cursor->set_value(cursor, static_cast<int>(operation), value); + break; + } + + error_code = cursor->insert(cursor); + + if (error_code == 0) debug_info("Workload tracking saved operation.", _trace_level, DEBUG_INFO); - } else { + else debug_info("Workload tracking failed to save operation !", _trace_level, DEBUG_ERROR); - } return error_code; } private: - const std::string _collection_name; - WT_CURSOR *_cursor = nullptr; + const std::string _operation_table_config; + const std::string _operation_table_name; + const std::string _schema_table_config; + const std::string _schema_table_name; + WT_CURSOR *_cursor_operations; + WT_CURSOR *_cursor_schema; uint64_t _timestamp; }; } // namespace test_harness diff --git a/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_validation.h b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_validation.h new file mode 100644 index 00000000000..21d25b0cece --- /dev/null +++ b/src/third_party/wiredtiger/test/cppsuite/test_harness/workload_validation.h @@ -0,0 +1,409 @@ +/*- + * Public Domain 2014-present MongoDB, Inc. + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef WORKLOAD_VALIDATION_H +#define WORKLOAD_VALIDATION_H + +#include <string> + +extern "C" { +#include "wiredtiger.h" +} + +namespace test_harness { +/* + * Class that can validate database state and collection data. + */ +class workload_validation { + public: + /* + * Validate the on disk data against what has been tracked during the test. The first step is to + * replay the tracked operations so a representation in memory of the collections is created. + * This representation is then compared to what is on disk. The second step is to go through + * what has been saved on disk and make sure the memory representation has the same data. + * operation_table_name is the collection that contains all the operations about the key/value + * pairs in the different collections used during the test. schema_table_name is the collection + * that contains all the operations about the creation or deletion of collections during the + * test. + */ + bool + validate(const std::string &operation_table_name, const std::string &schema_table_name) + { + WT_SESSION *session; + std::string collection_name; + /* + * Representation in memory of the collections at the end of the test. The first level is a + * map that contains collection names as keys. The second level is another map that contains + * the different key/value pairs within a given collection. If a collection yields to a null + * map of key/value pairs, this means the collection should not be present on disk. If a + * value from a key/value pair is null, this means the key should not be present in the + * collection on disk. + */ + std::map<std::string, std::map<int, std::string *> *> collections; + /* Existing collections after the test. */ + std::vector<std::string> created_collections; + bool is_valid; + + session = connection_manager::instance().create_session(); + + /* Retrieve the created collections that need to be checked. */ + collection_name = schema_table_name; + created_collections = parse_schema_tracking_table(session, collection_name); + + /* Allocate memory to the operations performed on the created collections. */ + for (auto const &it : created_collections) { + std::map<int, std::string *> *map = new std::map<int, std::string *>(); + collections[it] = map; + } + + /* + * Build in memory the final state of each created collection according to the tracked + * operations. + */ + collection_name = operation_table_name; + for (auto const &active_collection : created_collections) + parse_operation_tracking_table( + session, collection_name, active_collection, collections); + + /* Check all tracked operations in memory against the database on disk. */ + is_valid = check_reference(session, collections); + + /* Check what has been saved on disk against what has been tracked. */ + if (is_valid) { + for (auto const &collection : created_collections) { + is_valid = check_disk_state(session, collection, collections); + if (!is_valid) { + debug_info("check_disk_state failed for collection " + collection, _trace_level, + DEBUG_INFO); + break; + } + } + + } else + debug_info("check_reference failed!", _trace_level, DEBUG_INFO); + + /* Clean the allocated memory. */ + clean_memory(collections); + + return is_valid; + } + + /* Clean the memory used to represent the collections after the test. */ + void + clean_memory(std::map<std::string, std::map<int, std::string *> *> &collections) + { + for (auto &it_collections : collections) { + if (it_collections.second == nullptr) + continue; + + for (auto &it_operations : *it_collections.second) { + delete it_operations.second; + it_operations.second = nullptr; + } + delete it_collections.second; + it_collections.second = nullptr; + } + } + + /* + * collection_name is the collection that contains the operations on the different collections + * during the test. + */ + const std::vector<std::string> + parse_schema_tracking_table(WT_SESSION *session, const std::string &collection_name) + { + WT_CURSOR *cursor; + const char *key_collection_name; + int key_timestamp, value_operation_type; + std::vector<std::string> created_collections; + + testutil_check(session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor)); + + while (cursor->next(cursor) == 0) { + testutil_check(cursor->get_key(cursor, &key_collection_name, &key_timestamp)); + testutil_check(cursor->get_value(cursor, &value_operation_type)); + + debug_info( + "Collection name is " + std::string(key_collection_name), _trace_level, DEBUG_INFO); + debug_info("Timestamp is " + std::to_string(key_timestamp), _trace_level, DEBUG_INFO); + debug_info("Operation type is " + std::to_string(value_operation_type), _trace_level, + DEBUG_INFO); + + if (static_cast<tracking_operation>(value_operation_type) == + tracking_operation::CREATE) { + created_collections.push_back(key_collection_name); + } else if (static_cast<tracking_operation>(value_operation_type) == + tracking_operation::DELETE_COLLECTION) { + created_collections.erase(std::remove(created_collections.begin(), + created_collections.end(), key_collection_name), + created_collections.end()); + } + } + + return created_collections; + } + + /* + * Parse the tracked operations to build a representation in memory of the collections at the + * end of the test. tracking_collection_name is the tracking collection used to save the + * operations performed on the collections during the test. collection_name is the collection + * that needs to be represented in memory. + */ + void + parse_operation_tracking_table(WT_SESSION *session, const std::string &tracking_collection_name, + const std::string &collection_name, + std::map<std::string, std::map<int, std::string *> *> &collections) + { + WT_CURSOR *cursor; + int error_code, exact, key, key_timestamp, value_operation_type; + const char *key_collection_name, *value; + + testutil_check( + session->open_cursor(session, tracking_collection_name.c_str(), NULL, NULL, &cursor)); + + /* Our keys start at 0. */ + cursor->set_key(cursor, collection_name.c_str(), 0); + error_code = cursor->search_near(cursor, &exact); + + /* + * As we don't support deletion, the searched collection is expected to be found. Since the + * timestamp which is part of the key is not provided, exact is expected to be > 0. + */ + testutil_check(exact < 1); + + while (error_code == 0) { + testutil_check(cursor->get_key(cursor, &key_collection_name, &key, &key_timestamp)); + testutil_check(cursor->get_value(cursor, &value_operation_type, &value)); + + debug_info( + "Collection name is " + std::string(key_collection_name), _trace_level, DEBUG_INFO); + debug_info("Key is " + std::to_string(key), _trace_level, DEBUG_INFO); + debug_info("Timestamp is " + std::to_string(key_timestamp), _trace_level, DEBUG_INFO); + debug_info("Operation type is " + std::to_string(value_operation_type), _trace_level, + DEBUG_INFO); + debug_info("Value is " + std::string(value), _trace_level, DEBUG_INFO); + + /* + * If the cursor is reading an operation for a different collection, we know all the + * operations have been parsed for the collection we were interested in. + */ + if (std::string(key_collection_name) != collection_name) + break; + + /* Replay the current operation. */ + switch (static_cast<tracking_operation>(value_operation_type)) { + case tracking_operation::DELETE_KEY: + /* + * Operations are parsed from the oldest to the most recent one. It is safe to + * assume the key has been inserted previously in an existing collection and can be + * deleted safely. + */ + delete collections.at(key_collection_name)->at(key); + collections.at(key_collection_name)->at(key) = nullptr; + break; + case tracking_operation::INSERT: { + /* Keys are unique, it is safe to assume the key has not been encountered before. */ + std::pair<int, std::string *> pair(key, new std::string(value)); + collections.at(key_collection_name)->insert(pair); + break; + } + case tracking_operation::CREATE: + case tracking_operation::DELETE_COLLECTION: + testutil_die(DEBUG_ABORT, "Unexpected operation in the tracking table: %d", + static_cast<tracking_operation>(value_operation_type)); + default: + testutil_die( + DEBUG_ABORT, "tracking operation is unknown : %d", value_operation_type); + break; + } + + error_code = cursor->next(cursor); + } + + error_code = cursor->reset(cursor); + } + + /* + * Compare the tracked operations against what has been saved on disk. collections is the + * representation in memory of the collections after the test according to the tracking table. + */ + bool + check_reference( + WT_SESSION *session, std::map<std::string, std::map<int, std::string *> *> &collections) + { + + bool collection_exists, is_valid; + std::map<int, std::string *> *collection; + workload_validation wv; + std::string *value; + + for (const auto &it_collections : collections) { + /* Check the collection is in the correct state. */ + collection_exists = (it_collections.second != nullptr); + is_valid = wv.verify_database_state(session, it_collections.first, collection_exists); + + if (is_valid && collection_exists) { + collection = it_collections.second; + for (const auto &it_operations : *collection) { + value = (*collection)[it_operations.first]; + /* The key/value pair exists. */ + if (value != nullptr) + is_valid = (wv.is_key_present( + session, it_collections.first, it_operations.first) == true); + /* The key has been deleted. */ + else + is_valid = (wv.is_key_present( + session, it_collections.first, it_operations.first) == false); + + /* Check the associated value is valid. */ + if (is_valid && (value != nullptr)) { + is_valid = (wv.verify_value( + session, it_collections.first, it_operations.first, *value)); + } + + if (!is_valid) { + debug_info( + "check_reference failed for key " + std::to_string(it_operations.first), + _trace_level, DEBUG_INFO); + break; + } + } + } + + if (!is_valid) { + debug_info("check_reference failed for collection " + it_collections.first, + _trace_level, DEBUG_INFO); + break; + } + } + + return is_valid; + } + + /* Check what is present on disk against what has been tracked. */ + bool + check_disk_state(WT_SESSION *session, const std::string &collection_name, + std::map<std::string, std::map<int, std::string *> *> &collections) + { + WT_CURSOR *cursor; + int key; + const char *value; + bool is_valid; + std::string *value_str; + std::map<int, std::string *> *collection; + + testutil_check(session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor)); + + /* Check the collection has been tracked and contains data. */ + is_valid = + ((collections.count(collection_name) > 0) && (collections[collection_name] != nullptr)); + + if (!is_valid) + debug_info( + "Collection " + collection_name + " has not been tracked or has been deleted", + _trace_level, DEBUG_INFO); + else + collection = collections[collection_name]; + + /* Read the collection on disk. */ + while (is_valid && (cursor->next(cursor) == 0)) { + testutil_check(cursor->get_key(cursor, &key)); + testutil_check(cursor->get_value(cursor, &value)); + + debug_info("Key is " + std::to_string(key), _trace_level, DEBUG_INFO); + debug_info("Value is " + std::string(value), _trace_level, DEBUG_INFO); + + if (collection->count(key) > 0) { + value_str = collection->at(key); + /* + * Check the key/value pair on disk matches the one in memory from the tracked + * operations. + */ + is_valid = (value_str != nullptr) && (*value_str == std::string(value)); + if (!is_valid) + debug_info(" Key/Value pair mismatch.\n Disk key: " + std::to_string(key) + + "\n Disk value: " + std ::string(value) + + "\n Tracking table key: " + std::to_string(key) + + "\n Tracking table value: " + (value_str == nullptr ? "NULL" : *value_str), + _trace_level, DEBUG_INFO); + } else { + is_valid = false; + debug_info( + "The key " + std::to_string(key) + " present on disk has not been tracked", + _trace_level, DEBUG_INFO); + } + } + + return is_valid; + } + + /* + * Check whether a collection exists on disk. collection_name is the collection to check. exists + * needs to be set to true if the collection is expected to be existing, false otherwise. + */ + bool + verify_database_state( + WT_SESSION *session, const std::string &collection_name, bool exists) const + { + WT_CURSOR *cursor; + int ret = session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor); + return (exists ? (ret == 0) : (ret != 0)); + } + + template <typename K> + bool + is_key_present(WT_SESSION *session, const std::string &collection_name, const K &key) + { + WT_CURSOR *cursor; + testutil_check(session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor)); + cursor->set_key(cursor, key); + return (cursor->search(cursor) == 0); + } + + /* Verify the given expected value is the same on disk. */ + template <typename K, typename V> + bool + verify_value(WT_SESSION *session, const std::string &collection_name, const K &key, + const V &expected_value) + { + WT_CURSOR *cursor; + const char *value; + + testutil_check(session->open_cursor(session, collection_name.c_str(), NULL, NULL, &cursor)); + cursor->set_key(cursor, key); + testutil_check(cursor->search(cursor)); + testutil_check(cursor->get_value(cursor, &value)); + + return (value == expected_value); + } + + private: +}; +} // namespace test_harness + +#endif diff --git a/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx b/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx index a90ce8a92d8..2d8cc319565 100755 --- a/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx +++ b/src/third_party/wiredtiger/test/cppsuite/tests/poc.cxx @@ -34,7 +34,7 @@ class poc_test : public test_harness::test { public: - poc_test(const std::string &config, int64_t trace_level, bool enable_tracking) : test(config, enable_tracking) + poc_test(const std::string &config, int64_t trace_level) : test(config) { test_harness::_trace_level = trace_level; } @@ -47,8 +47,10 @@ class poc_test : public test_harness::test { }; const std::string poc_test::test::name = "poc_test"; -const std::string poc_test::test::default_config = "collection_count=2,key_count=5,value_size=20," - "read_threads=1,duration_seconds=1"; +const std::string poc_test::test::default_config = + "collection_count=2,key_count=5,value_size=10," + "read_threads=1,duration_seconds=10,cache_size_mb=1000," + "stat_cache_size=(enabled=true,limit=100),rate_per_second=10,enable_tracking=true"; int main(int argc, char *argv[]) @@ -83,8 +85,8 @@ main(int argc, char *argv[]) cfg = poc_test::test::default_config; std::cout << "Configuration\t:" << cfg << std::endl; - std::cout << "Tracel level\t:" << trace_level << std::endl; + std::cout << "Trace level\t:" << trace_level << std::endl; - poc_test(cfg, trace_level, true).run(); + poc_test(cfg, trace_level).run(); return (0); } diff --git a/src/third_party/wiredtiger/test/evergreen.yml b/src/third_party/wiredtiger/test/evergreen.yml index 0404a724622..5dd777b34f5 100755 --- a/src/third_party/wiredtiger/test/evergreen.yml +++ b/src/third_party/wiredtiger/test/evergreen.yml @@ -2648,9 +2648,9 @@ buildvariants: test_env_vars: PATH=/opt/mongodbtoolchain/v3/bin:$PATH LD_LIBRARY_PATH=$(pwd)/.libs top_srcdir=$(pwd)/.. top_builddir=$(pwd) tasks: - name: compile - # - name: generate-datafile-little-endian - # - name: verify-datafile-little-endian - # - name: verify-datafile-from-big-endian + - name: generate-datafile-little-endian + - name: verify-datafile-little-endian + - name: verify-datafile-from-big-endian - name: big-endian display_name: "~ Big-endian (s390x/zSeries)" @@ -2665,9 +2665,9 @@ buildvariants: test_env_vars: PATH=/opt/mongodbtoolchain/v3/bin:$PATH LD_LIBRARY_PATH=$(pwd)/.lib top_srcdir=$(pwd)/.. top_builddir=$(pwd) tasks: - name: compile - # - name: generate-datafile-big-endian - # - name: verify-datafile-big-endian - # - name: verify-datafile-from-little-endian + - name: generate-datafile-big-endian + - name: verify-datafile-big-endian + - name: verify-datafile-from-little-endian - name: ubuntu1804-ppc display_name: "~ Ubuntu 18.04 PPC" diff --git a/src/third_party/wiredtiger/test/evergreen/verify_wt_datafiles.sh b/src/third_party/wiredtiger/test/evergreen/verify_wt_datafiles.sh index d5dc40fcf1e..68479d07f32 100755 --- a/src/third_party/wiredtiger/test/evergreen/verify_wt_datafiles.sh +++ b/src/third_party/wiredtiger/test/evergreen/verify_wt_datafiles.sh @@ -57,7 +57,7 @@ do echo "${d}" ${wt_binary} -h ${d} printlog > /dev/null - if [ "$?" -ne "0" ]; then + if [ "$?" -ne "0" ]; then echo "Failed to dump '${d}' log files, exiting ..." exit 1 fi diff --git a/src/third_party/wiredtiger/test/format/CONFIG.endian b/src/third_party/wiredtiger/test/format/CONFIG.endian index 1ac81778d19..791b24a4c11 100644 --- a/src/third_party/wiredtiger/test/format/CONFIG.endian +++ b/src/third_party/wiredtiger/test/format/CONFIG.endian @@ -4,3 +4,4 @@ logging.archive=0 logging=1 runs.timer=4 runs.rows=1000000 +runs.type=row-store diff --git a/src/third_party/wiredtiger/test/format/config.c b/src/third_party/wiredtiger/test/format/config.c index 996d636340f..0094cec7c88 100644 --- a/src/third_party/wiredtiger/test/format/config.c +++ b/src/third_party/wiredtiger/test/format/config.c @@ -959,6 +959,8 @@ config_transaction(void) if (g.c_txn_freq != 100 && config_is_perm("transaction.frequency")) testutil_die(EINVAL, "timestamps require transaction frequency set to 100"); } + if (g.c_logging && config_is_perm("logging") && g.c_prepare) + config_single("ops.prepare=off", false); /* FIXME-WT-6431: temporarily disable salvage with timestamps. */ if (g.c_txn_timestamps && g.c_salvage) { @@ -994,8 +996,6 @@ config_transaction(void) if (g.c_txn_rollback_to_stable) { if (!g.c_txn_timestamps) config_single("transaction.timestamps=on", false); - if (g.c_logging) - config_single("logging=off", false); } if (g.c_txn_timestamps) { if (g.c_isolation_flag != ISOLATION_SNAPSHOT) diff --git a/src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot04.py b/src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot04.py new file mode 100644 index 00000000000..840002282a3 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_checkpoint_snapshot04.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import fnmatch, os, shutil, threading, time +import wiredtiger, wttest +from wtbackup import backup_base +from wtdataset import SimpleDataSet +from wtscenario import make_scenarios + +# test_checkpoint_snapshot04.py +# Test utility dump of backup and original database when the transaction ids are +# written to disk. +class test_checkpoint_snapshot04(backup_base): + dir = 'backup.dir' + + # Create a table. + uri = "table:test_checkpoint_snapshot04" + nrows = 5000 + + target_backup = [ + ('full', dict(target=False)), + ('target', dict(target=True)) + ] + + scenarios = make_scenarios(target_backup) + + def conn_config(self): + config = 'cache_size=25MB' + return config + + def large_updates(self, uri, value, ds, nrows): + # Update a large number of records. + session = self.session + cursor = session.open_cursor(uri) + for i in range(0, nrows): + session.begin_transaction() + cursor[ds.key(i)] = value + session.commit_transaction() + cursor.close() + + def check(self, check_value, uri, nrows): + session = self.session + session.begin_transaction() + cursor = session.open_cursor(uri) + count = 0 + for k, v in cursor: + self.assertEqual(v, check_value) + count += 1 + session.commit_transaction() + self.assertEqual(count, nrows) + + def test_checkpoint_snapshot(self): + ds = SimpleDataSet(self, self.uri, 0, key_format="S", value_format="S") + ds.populate() + valuea = "aaaaa" * 100 + valueb = "bbbbb" * 100 + + session1 = self.conn.open_session() + session1.begin_transaction() + cursor1 = session1.open_cursor(self.uri) + for i in range(self.nrows, self.nrows + 1): + cursor1.set_key(ds.key(i)) + cursor1.set_value(valueb) + self.assertEqual(cursor1.insert(), 0) + + self.large_updates(self.uri, valuea, ds, self.nrows) + self.check(valuea, self.uri, self.nrows) + + self.session.checkpoint() + + # Create the backup directory. + os.mkdir(self.dir) + + # Open up the backup cursor, and copy the files. + if self.target: + config = 'target=("table:test_checkpoint_snapshot04")' + else: + config = "" + cursor = self.session.open_cursor('backup:', None, config) + while True: + ret = cursor.next() + if ret != 0: + break + shutil.copy(cursor.get_key(), self.dir) + self.assertEqual(ret, wiredtiger.WT_NOTFOUND) + cursor.close() + + session1.rollback_transaction() + + self.compare_backups(self.uri, self.dir, './') + +if __name__ == '__main__': + wttest.run() diff --git a/src/third_party/wiredtiger/test/suite/test_rollback_to_stable17.py b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable17.py new file mode 100644 index 00000000000..10db33e0d1c --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_rollback_to_stable17.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os, shutil +from helper import simulate_crash_restart +import wiredtiger, wttest +from wiredtiger import stat +from wtdataset import SimpleDataSet + +def timestamp_str(t): + return '%x' % t + +# test_rollback_to_stable17.py +# Test that rollback to stable handles updates present on history store and data store for variable +# length column store. +class test_rollback_to_stable17(wttest.WiredTigerTestCase): + conn_config = 'cache_size=200MB,statistics=(all)' + session_config = 'isolation=snapshot' + + def insert_update_data(self, uri, value, start_row, end_row, timestamp): + cursor = self.session.open_cursor(uri) + for i in range(start_row, end_row): + self.session.begin_transaction() + cursor[i] = value + self.session.commit_transaction('commit_timestamp=' + timestamp_str(timestamp)) + cursor.close() + + def check(self, check_value, uri, nrows, read_ts): + session = self.session + session.begin_transaction('read_timestamp=' + timestamp_str(read_ts)) + cursor = session.open_cursor(uri) + + count = 0 + for k, v in cursor: + self.assertEqual(v, check_value) + count += 1 + + session.commit_transaction() + self.assertEqual(count, nrows) + cursor.close() + + def test_rollback_to_stable(self): + # Create a table. + uri = "table:rollback_to_stable17" + nrows = 200 + start_row = 1 + ts = [2,5,7,9] + values = ["aaaa", "bbbb", "cccc", "dddd"] + + create_params = 'key_format=r,value_format=S' + self.session.create(uri, create_params) + + # Pin oldest and stable to timestamp 1. + self.conn.set_timestamp('oldest_timestamp=' + timestamp_str(1) + + ',stable_timestamp=' + timestamp_str(1)) + + # Make a series of updates for the same keys with different values at different timestamps. + for i in range(len(values)): + self.insert_update_data(uri, values[i], start_row, nrows, ts[i]) + + # Set the stable timestamp to 5. + self.conn.set_timestamp('stable_timestamp=' + timestamp_str(5)) + + # Checkpoint to ensure that all the updates are flushed to disk. + self.session.checkpoint() + + # Rollback to stable done as part of recovery. + simulate_crash_restart(self, ".", "RESTART") + + # Check that keys at timestamps 2 and 5 have the correct values they were updated with. + self.check(values[0], uri, nrows - 1, 2) + self.check(values[1], uri, nrows - 1, 5) + # Check that the keys at timestamps 7 and 9 were rolled back to contain the value at the + # stable timestamp 5. + self.check(values[1], uri, nrows - 1, 7) + self.check(values[1], uri, nrows - 1, 9) + + self.session.close() + +if __name__ == '__main__': + wttest.run() diff --git a/src/third_party/wiredtiger/test/suite/test_tiered06.py b/src/third_party/wiredtiger/test/suite/test_tiered06.py new file mode 100755 index 00000000000..113a4c5cab3 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_tiered06.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2020 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import os, wiredtiger, wttest +StorageSource = wiredtiger.StorageSource # easy access to constants + +# test_tiered06.py +# Test the local storage source. +class test_tiered06(wttest.WiredTigerTestCase): + # Load the local store extension, but skip the test if it is missing. + def conn_extensions(self, extlist): + extlist.skip_if_missing = True + #extlist.extension('storage_sources', + # 'local_store=(config=\"(verbose=1,delay_ms=200,force_delay=3)\")') + extlist.extension('storage_sources', 'local_store') + + def breakpoint(self): + import pdb, sys + sys.stdin = open('/dev/tty', 'r') + sys.stdout = open('/dev/tty', 'w') + sys.stderr = open('/dev/tty', 'w') + pdb.set_trace() + + def get_local_storage_source(self): + local = self.conn.get_storage_source('local_store') + + # Note: do not call local.terminate() . + # Since the local_storage extension has been loaded as a consequence of the + # wiredtiger_open call, WiredTiger already knows to call terminate when the connection + # closes. Calling it twice would attempt to free the same memory twice. + local.terminate = None + return local + + def test_local_basic(self): + # Test some basic functionality of the storage source API, calling + # each supported method in the API at least once. + + session = self.session + local = self.get_local_storage_source() + + os.mkdir("objects") + location = local.ss_location_handle(session, + 'cluster="cluster1",bucket="./objects",kmsid="Secret"') + + # The object doesn't exist yet. + self.assertFalse(local.ss_exist(session, location, 'foobar')) + + fh = local.ss_open_object(session, location, 'foobar', StorageSource.open_create) + + outbytes = ('MORE THAN ENOUGH DATA\n'*100000).encode() + fh.fh_write(session, 0, outbytes) + + # The object doesn't even exist now. + self.assertFalse(local.ss_exist(session, location, 'foobar')) + + # The object exists after close + fh.close(session) + self.assertTrue(local.ss_exist(session, location, 'foobar')) + + fh = local.ss_open_object(session, location, 'foobar', StorageSource.open_readonly) + inbytes = bytes(1000000) # An empty buffer with a million zero bytes. + fh.fh_read(session, 0, inbytes) # read into the buffer + self.assertEquals(outbytes[0:1000000], inbytes) + self.assertEquals(local.ss_size(session, location, 'foobar'), len(outbytes)) + self.assertEquals(fh.fh_size(session), len(outbytes)) + fh.close(session) + + # The fh_lock call doesn't do anything in the local store implementation. + fh = local.ss_open_object(session, location, 'foobar', StorageSource.open_readonly) + fh.fh_lock(session, True) + fh.fh_lock(session, False) + fh.close(session) + + self.assertEquals(local.ss_location_list(session, location, '', 0), ['foobar']) + + # Make sure any new object is not in the list until it is closed. + fh = local.ss_open_object(session, location, 'zzz', StorageSource.open_create) + self.assertEquals(local.ss_location_list(session, location, '', 0), ['foobar']) + # Sync merely syncs to the local disk. + fh.fh_sync(session) + fh.close(session) # zero length + self.assertEquals(sorted(local.ss_location_list(session, location, '', 0)), + ['foobar', 'zzz']) + + # See that we can remove objects. + local.ss_remove(session, location, 'zzz', 0) + self.assertEquals(local.ss_location_list(session, location, '', 0), ['foobar']) + + # Flushing doesn't do anything that's visible. + local.ss_flush(session, location, None, '') + self.assertEquals(local.ss_location_list(session, location, '', 0), ['foobar']) + + location.close(session) + + def test_local_write_read(self): + # Write and read to a file non-sequentially. + + session = self.session + local = self.get_local_storage_source() + + os.mkdir("objects") + location = local.ss_location_handle(session, + 'cluster="cluster1",bucket="./objects",kmsid="Secret"') + + # We call these 4K chunks of data "blocks" for this test, but that doesn't + # necessarily relate to WT block sizing. + nblocks = 1000 + block_size = 4096 + fh = local.ss_open_object(session, location, 'abc', StorageSource.open_create) + + # blocks filled with 'a', etc. + a_block = ('a' * block_size).encode() + b_block = ('b' * block_size).encode() + c_block = ('c' * block_size).encode() + file_size = nblocks * block_size + + # write all blocks as 'a', but in reverse order + for pos in range(file_size - block_size, 0, -block_size): + fh.fh_write(session, pos, a_block) + + # write the even blocks as 'b', forwards + for pos in range(0, file_size, block_size * 2): + fh.fh_write(session, pos, b_block) + + # write every third block as 'c', backwards + for pos in range(file_size - block_size, 0, -block_size * 3): + fh.fh_write(session, pos, c_block) + fh.close(session) + + in_block = bytes(block_size) + fh = local.ss_open_object(session, location, 'abc', StorageSource.open_readonly) + + # Do some spot checks, reading non-sequentially + fh.fh_read(session, 500 * block_size, in_block) # divisible by 2, not 3 + self.assertEquals(in_block, b_block) + fh.fh_read(session, 333 * block_size, in_block) # divisible by 3, not 2 + self.assertEquals(in_block, c_block) + fh.fh_read(session, 401 * block_size, in_block) # not divisible by 2 or 3 + self.assertEquals(in_block, a_block) + + # Read the whole file, backwards checking to make sure + # each block was written correctly. + for block_num in range(nblocks - 1, 0, -1): + pos = block_num * block_size + fh.fh_read(session, pos, in_block) + if block_num % 3 == 0: + self.assertEquals(in_block, c_block) + elif block_num % 2 == 0: + self.assertEquals(in_block, b_block) + else: + self.assertEquals(in_block, a_block) + fh.close(session) + + def create_in_loc(self, loc, objname): + session = self.session + fh = self.local.ss_open_object(session, loc, objname, StorageSource.open_create) + fh.fh_write(session, 0, 'some stuff'.encode()) + fh.close(session) + + def check(self, loc, prefix, limit, expect): + # We don't require any sorted output for location lists, + # so we'll sort before comparing.' + got = sorted(self.local.ss_location_list(self.session, loc, prefix, limit)) + expect = sorted(expect) + self.assertEquals(got, expect) + + def test_local_locations(self): + # Test using various buckets, clusters + + session = self.session + local = self.conn.get_storage_source('local_store') + self.local = local + os.mkdir("objects1") + os.mkdir("objects2") + + # Any of the activity that happens in the various locations + # should be independent. + location1 = local.ss_location_handle(session, + 'cluster="cluster1",bucket="./objects1",kmsid="k1"') + location2 = local.ss_location_handle(session, + 'cluster="cluster1",bucket="./objects2",kmsid="k2"') + location3 = local.ss_location_handle(session, + 'cluster="cluster2",bucket="./objects1",kmsid="k3"') + location4 = local.ss_location_handle(session, + 'cluster="cluster2",bucket="./objects2",kmsid="k4"') + + # Create files in the locations with some name overlap + self.create_in_loc(location1, 'alpaca') + self.create_in_loc(location2, 'bear') + self.create_in_loc(location3, 'crab') + self.create_in_loc(location4, 'deer') + for a in ['beagle', 'bird', 'bison', 'bat']: + self.create_in_loc(location1, a) + for a in ['bird', 'bison', 'bat', 'badger']: + self.create_in_loc(location2, a) + for a in ['bison', 'bat', 'badger', 'baboon']: + self.create_in_loc(location3, a) + for a in ['bat', 'badger', 'baboon', 'beagle']: + self.create_in_loc(location4, a) + + # Make sure we see the expected file names + self.check(location1, '', 0, ['alpaca', 'beagle', 'bird', 'bison', 'bat']) + self.check(location1, 'a', 0, ['alpaca']) + self.check(location1, 'b', 0, ['beagle', 'bird', 'bison', 'bat']) + self.check(location1, 'c', 0, []) + self.check(location1, 'd', 0, []) + + self.check(location2, '', 0, ['bear', 'bird', 'bison', 'bat', 'badger']) + self.check(location2, 'a', 0, []) + self.check(location2, 'b', 0, ['bear', 'bird', 'bison', 'bat', 'badger']) + self.check(location2, 'c', 0, []) + self.check(location2, 'd', 0, []) + + self.check(location3, '', 0, ['crab', 'bison', 'bat', 'badger', 'baboon']) + self.check(location3, 'a', 0, []) + self.check(location3, 'b', 0, ['bison', 'bat', 'badger', 'baboon']) + self.check(location3, 'c', 0, ['crab']) + self.check(location3, 'd', 0, []) + + self.check(location4, '', 0, ['deer', 'bat', 'badger', 'baboon', 'beagle']) + self.check(location4, 'a', 0, []) + self.check(location4, 'b', 0, ['bat', 'badger', 'baboon', 'beagle']) + self.check(location4, 'c', 0, []) + self.check(location4, 'd', 0, ['deer']) + + # Flushing doesn't do anything that's visible, but calling it still exercises code paths. + # At some point, we'll have statistics we can check. + # + # For now, we can turn on the verbose config option for the local_store extension to verify. + local.ss_flush(session, location4, None, '') + local.ss_flush(session, location3, 'badger', '') + local.ss_flush(session, location3, 'c', '') # make sure we don't flush prefixes + local.ss_flush(session, location3, 'b', '') # or suffixes + local.ss_flush(session, location3, 'crab', '') + local.ss_flush(session, location3, 'crab', '') # should do nothing + local.ss_flush(session, None, None, '') # flush everything else + local.ss_flush(session, None, None, '') # should do nothing + +if __name__ == '__main__': + wttest.run() |