diff options
Diffstat (limited to 'storage/tokudb/ft-index/src/ydb.cc')
-rw-r--r-- | storage/tokudb/ft-index/src/ydb.cc | 300 |
1 files changed, 264 insertions, 36 deletions
diff --git a/storage/tokudb/ft-index/src/ydb.cc b/storage/tokudb/ft-index/src/ydb.cc index 7617f21d6fc..6a08b89c81a 100644 --- a/storage/tokudb/ft-index/src/ydb.cc +++ b/storage/tokudb/ft-index/src/ydb.cc @@ -49,6 +49,7 @@ UNIVERSITY PATENT NOTICE: PATENT MARKING NOTICE: This software is covered by US Patent No. 8,185,551. + This software is covered by US Patent No. 8,489,638. PATENT RIGHTS GRANT: @@ -1114,8 +1115,8 @@ env_close(DB_ENV * env, uint32_t flags) { r = toku_ydb_do_error(env, EINVAL, "%s", err_msg); goto panic_and_quit_early; } - if (env->i->open_dbs) { //Verify that there are no open dbs. - if (toku_omt_size(env->i->open_dbs) > 0) { + if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs. + if (toku_omt_size(env->i->open_dbs_by_dname) > 0) { err_msg = "Cannot close environment due to open DBs\n"; r = toku_ydb_do_error(env, EINVAL, "%s", err_msg); goto panic_and_quit_early; @@ -1191,11 +1192,13 @@ env_close(DB_ENV * env, uint32_t flags) { toku_free(env->i->real_log_dir); if (env->i->real_tmp_dir) toku_free(env->i->real_tmp_dir); - if (env->i->open_dbs) - toku_omt_destroy(&env->i->open_dbs); + if (env->i->open_dbs_by_dname) + toku_omt_destroy(&env->i->open_dbs_by_dname); + if (env->i->open_dbs_by_dict_id) + toku_omt_destroy(&env->i->open_dbs_by_dict_id); if (env->i->dir) toku_free(env->i->dir); - toku_mutex_destroy(&env->i->open_dbs_lock); + toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock); // Immediately before freeing internal environment unlock the directories unlock_single_process(env); @@ -1717,6 +1720,12 @@ env_set_lock_timeout(DB_ENV *env, uint64_t lock_timeout_msec) { return 0; } +static int +env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) { + env->i->lock_wait_timeout_callback = callback; + return 0; +} + static void format_time(const time_t *timer, char *buf) { ctime_r(timer, buf); @@ -1745,9 +1754,11 @@ typedef enum { FS_ENOSPC_REDZONE_CTR, // number of operations rejected by enospc prevention (red zone) FS_ENOSPC_MOST_RECENT, // most recent time that file system was completely full FS_ENOSPC_COUNT, // total number of times ENOSPC was returned from an attempt to write - FS_FSYNC_TIME , + FS_FSYNC_TIME, FS_FSYNC_COUNT, - FS_STATUS_NUM_ROWS + FS_LONG_FSYNC_TIME, + FS_LONG_FSYNC_COUNT, + FS_STATUS_NUM_ROWS, // must be last } fs_status_entry; typedef struct { @@ -1766,8 +1777,10 @@ fs_status_init(void) { FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR, nullptr, UINT64, "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS); FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT, nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS); FS_STATUS_INIT(FS_ENOSPC_COUNT, nullptr, UINT64, "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS); - FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_SECONDS, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); + FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_TIME, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); FS_STATUS_INIT(FS_FSYNC_COUNT, FILESYSTEM_FSYNC_NUM, UINT64, "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); + FS_STATUS_INIT(FS_LONG_FSYNC_TIME, FILESYSTEM_LONG_FSYNC_TIME, UINT64, "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); + FS_STATUS_INIT(FS_LONG_FSYNC_COUNT, FILESYSTEM_LONG_FSYNC_NUM, UINT64, "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS); fsstat.initialized = true; } #undef FS_STATUS_INIT @@ -1792,10 +1805,12 @@ fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) { FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp; FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total; - uint64_t fsync_count, fsync_time; - toku_get_fsync_times(&fsync_count, &fsync_time); + uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time; + toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time); FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count; FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time; + FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count; + FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time; } #undef FS_STATUS_VALUE @@ -2238,6 +2253,177 @@ env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) { return toku_db_cursor(env->i->directory, txn, c, 0); } +struct ltm_iterate_requests_callback_extra { + ltm_iterate_requests_callback_extra(DB_ENV *e, + iterate_requests_callback cb, + void *ex) : + env(e), callback(cb), extra(ex) { + } + DB_ENV *env; + iterate_requests_callback callback; + void *extra; +}; + +static int +find_db_by_dict_id(OMTVALUE v, void *dict_id_v) { + DB *db = (DB *) v; + DICTIONARY_ID dict_id = db->i->dict_id; + DICTIONARY_ID dict_id_find = *(DICTIONARY_ID *) dict_id_v; + if (dict_id.dictid < dict_id_find.dictid) { + return -1; + } else if (dict_id.dictid > dict_id_find.dictid) { + return 1; + } else { + return 0; + } +} + +static DB * +locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) { + OMTVALUE dbv; + int r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_dict_id, + (void *) &dict_id, &dbv, nullptr); + return r == 0 ? (DB *) dbv : nullptr; +} + +static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid, + const DBT *left_key, + const DBT *right_key, + TXNID blocking_txnid, + uint64_t start_time, + void *extra) { + ltm_iterate_requests_callback_extra *info = + reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra); + + toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock); + int r = 0; + DB *db = locked_get_db_by_dict_id(info->env, dict_id); + if (db != nullptr) { + r = info->callback(db, txnid, left_key, right_key, + blocking_txnid, start_time, info->extra); + } + toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock); + return r; +} + +static int +env_iterate_pending_lock_requests(DB_ENV *env, + iterate_requests_callback callback, + void *extra) { + if (!env_opened(env)) { + return EINVAL; + } + + toku::locktree::manager *mgr = &env->i->ltm; + ltm_iterate_requests_callback_extra e(env, callback, extra); + return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e); +} + +// for the lifetime of this object: +// - open_dbs_rwlock must be read locked (or better) +// - txn_mutex must be held +struct iter_txn_row_locks_callback_extra { + iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) : + env(e), current_db(nullptr), which_lt(0), lt_map(m) { + if (lt_map->size() > 0) { + set_iterator_and_current_db(); + } + } + + void set_iterator_and_current_db() { + txn_lt_key_ranges ranges; + const int r = lt_map->fetch(which_lt, &ranges); + invariant_zero(r); + current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id()); + iter.create(ranges.buffer); + } + + DB_ENV *env; + DB *current_db; + size_t which_lt; + toku::omt<txn_lt_key_ranges> *lt_map; + toku::range_buffer::iterator iter; + toku::range_buffer::iterator::record rec; +}; + +static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) { + iter_txn_row_locks_callback_extra *info = + reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra); + + while (info->which_lt < info->lt_map->size()) { + const bool more = info->iter.current(&info->rec); + if (more) { + *db = info->current_db; + // The caller should interpret data/size == 0 to mean infinity. + // Therefore, when we copyref pos/neg infinity into left/right_key, + // the caller knows what we're talking about. + toku_copyref_dbt(left_key, *info->rec.get_left_key()); + toku_copyref_dbt(right_key, *info->rec.get_right_key()); + info->iter.next(); + return 0; + } else { + info->which_lt++; + if (info->which_lt < info->lt_map->size()) { + info->set_iterator_and_current_db(); + } + } + } + return DB_NOTFOUND; +} + +struct iter_txns_callback_extra { + iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) : + env(e), callback(cb), extra(ex) { + } + DB_ENV *env; + iterate_transactions_callback callback; + void *extra; +}; + +static int iter_txns_callback(TOKUTXN txn, void *extra) { + iter_txns_callback_extra *info = + reinterpret_cast<iter_txns_callback_extra *>(extra); + + DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn); + invariant_notnull(dbtxn); + + toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex); + toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock); + + iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map); + const int r = info->callback(toku_txn_get_txnid(txn).parent_id64, + toku_txn_get_client_id(txn), + iter_txn_row_locks_callback, + &e, + info->extra); + + toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock); + toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex); + + return r; +} + +static int +env_iterate_live_transactions(DB_ENV *env, + iterate_transactions_callback callback, + void *extra) { + if (!env_opened(env)) { + return EINVAL; + } + + TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger); + iter_txns_callback_extra e(env, callback, extra); + return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e); +} + +static void env_set_loader_memory_size(DB_ENV *env, uint64_t loader_memory_size) { + env->i->loader_memory_size = loader_memory_size; +} + +static uint64_t env_get_loader_memory_size(DB_ENV *env) { + return env->i->loader_memory_size; +} + static int toku_env_create(DB_ENV ** envp, uint32_t flags) { int r = ENOSYS; @@ -2301,13 +2487,18 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { USENV(txn_stat); USENV(get_lock_timeout); USENV(set_lock_timeout); + USENV(set_lock_timeout_callback); USENV(set_redzone); USENV(log_flush); USENV(log_archive); USENV(create_loader); USENV(get_cursor_for_persistent_environment); USENV(get_cursor_for_directory); + USENV(iterate_pending_lock_requests); + USENV(iterate_live_transactions); USENV(change_fsync_log_period); + USENV(set_loader_memory_size); + USENV(get_loader_memory_size); #undef USENV // unlocked methods @@ -2344,10 +2535,11 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { // The escalate callback will need it to translate txnids to DB_TXNs result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result); - r = toku_omt_create(&result->i->open_dbs); - toku_mutex_init(&result->i->open_dbs_lock, NULL); + r = toku_omt_create(&result->i->open_dbs_by_dname); assert_zero(r); - assert(result->i->open_dbs); + r = toku_omt_create(&result->i->open_dbs_by_dict_id); + assert_zero(r); + toku_pthread_rwlock_init(&result->i->open_dbs_rwlock, NULL); *envp = result; r = 0; @@ -2372,7 +2564,7 @@ DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) { // return <0 if v is earlier in omt than dbv // return >0 if v is later in omt than dbv static int -find_db_by_db (OMTVALUE v, void *dbv) { +find_db_by_db_dname(OMTVALUE v, void *dbv) { DB *db = (DB *) v; // DB* that is stored in the omt DB *dbfind = (DB *) dbv; // extra, to be compared to v int cmp; @@ -2385,42 +2577,78 @@ find_db_by_db (OMTVALUE v, void *dbv) { return 0; } +static int +find_db_by_db_dict_id(OMTVALUE v, void *dbv) { + DB *db = (DB *) v; + DB *dbfind = (DB *) dbv; + DICTIONARY_ID dict_id = db->i->dict_id; + DICTIONARY_ID dict_id_find = dbfind->i->dict_id; + if (dict_id.dictid < dict_id_find.dictid) { + return -1; + } else if (dict_id.dictid > dict_id_find.dictid) { + return 1; + } else if (db < dbfind) { + return -1; + } else if (db > dbfind) { + return 1; + } else { + return 0; + } +} + // Tell env that there is a new db handle (with non-unique dname in db->i-dname) void env_note_db_opened(DB_ENV *env, DB *db) { - toku_mutex_lock(&env->i->open_dbs_lock); + toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock); assert(db->i->dname); // internal (non-user) dictionary has no dname + int r; - OMTVALUE dbv; + OMTVALUE v; uint32_t idx; - STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs); + r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname, + db, &v, &idx); + assert(r == DB_NOTFOUND); + r = toku_omt_insert_at(env->i->open_dbs_by_dname, db, idx); + assert_zero(r); + r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id, + db, &v, &idx); + assert(r == DB_NOTFOUND); + r = toku_omt_insert_at(env->i->open_dbs_by_dict_id, db, idx); + assert_zero(r); + + STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname); STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++; - if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) + if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) { STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS); - r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); - assert(r==DB_NOTFOUND); //Must not already be there. - r = toku_omt_insert_at(env->i->open_dbs, db, idx); - assert_zero(r); - toku_mutex_unlock(&env->i->open_dbs_lock); + } + toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock); } // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals. void env_note_db_closed(DB_ENV *env, DB *db) { - toku_mutex_lock(&env->i->open_dbs_lock); + toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock); assert(db->i->dname); // internal (non-user) dictionary has no dname - assert(toku_omt_size(env->i->open_dbs) > 0); + assert(toku_omt_size(env->i->open_dbs_by_dname) > 0); + assert(toku_omt_size(env->i->open_dbs_by_dict_id) > 0); + int r; - OMTVALUE dbv; + OMTVALUE v; uint32_t idx; - STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++; - r = toku_omt_find_zero(env->i->open_dbs, find_db_by_db, db, &dbv, &idx); - assert_zero(r); //Must already be there. - assert((DB*)dbv == db); - r = toku_omt_delete_at(env->i->open_dbs, idx); - STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs); + r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_db_by_db_dname, + db, &v, &idx); + assert_zero(r); + r = toku_omt_delete_at(env->i->open_dbs_by_dname, idx); assert_zero(r); - toku_mutex_unlock(&env->i->open_dbs_lock); + r = toku_omt_find_zero(env->i->open_dbs_by_dict_id, find_db_by_db_dict_id, + db, &v, &idx); + assert_zero(r); + r = toku_omt_delete_at(env->i->open_dbs_by_dict_id, idx); + assert_zero(r); + + STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++; + STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = toku_omt_size(env->i->open_dbs_by_dname); + toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock); } static int @@ -2440,8 +2668,8 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { bool rval; OMTVALUE dbv; uint32_t idx; - toku_mutex_lock(&env->i->open_dbs_lock); - r = toku_omt_find_zero(env->i->open_dbs, find_open_db_by_dname, (void*)dname, &dbv, &idx); + toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock); + r = toku_omt_find_zero(env->i->open_dbs_by_dname, find_open_db_by_dname, (void*)dname, &dbv, &idx); if (r==0) { DB *db = (DB *) dbv; assert(strcmp(dname, db->i->dname) == 0); @@ -2451,7 +2679,7 @@ env_is_db_with_dname_open(DB_ENV *env, const char *dname) { assert(r==DB_NOTFOUND); rval = false; } - toku_mutex_unlock(&env->i->open_dbs_lock); + toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock); return rval; } |